From 1f1868a294b89f6d976c37269823b510477bafbb Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 6 Jun 2019 08:51:11 -0500 Subject: [PATCH] [ML][Data Frame] pull state and states for indexer from index (#42856) (#42935) * [ML][Data Frame] pull state and states for indexer from index * Update DataFrameTransformTask.java --- ...FrameTransformPersistentTasksExecutor.java | 24 +++++++++---------- .../transforms/DataFrameTransformTask.java | 10 ++++++-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java index 443d499dfef..97d4f9a818b 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java @@ -111,12 +111,6 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) .setAuditor(auditor) .setClient(client) - .setIndexerState(currentIndexerState(transformPTaskState)) - // If the transform persistent task state is `null` that means this is a "first run". - // If we have state then the task has relocated from another node in which case this - // state is preferred - .setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition()) - .setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress()) .setTransformsCheckpointService(dataFrameTransformsCheckpointService) .setTransformsConfigManager(transformsConfigManager); @@ -132,18 +126,22 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx // Schedule execution regardless ActionListener transformStatsActionListener = ActionListener.wrap( stateAndStats -> { - indexerBuilder.setInitialStats(stateAndStats.getTransformStats()); - if (transformPTaskState == null) { // prefer the persistent task state - indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition()); - indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress()); - } + logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString()); + indexerBuilder.setInitialStats(stateAndStats.getTransformStats()) + .setInitialPosition(stateAndStats.getTransformState().getPosition()) + .setProgress(stateAndStats.getTransformState().getProgress()) + .setIndexerState(currentIndexerState(stateAndStats.getTransformState())); + logger.info("[{}] Loading existing state: [{}], position [{}]", + transformId, + stateAndStats.getTransformState(), + stateAndStats.getTransformState().getPosition()); - final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint(); + final Long checkpoint = stateAndStats.getTransformState().getCheckpoint(); startTask(buildTask, indexerBuilder, checkpoint, startTaskListener); }, error -> { if (error instanceof ResourceNotFoundException == false) { - logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); + logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); } startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index b3464b72471..b351c6ad1fb 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -89,12 +89,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S String initialReason = null; long initialGeneration = 0; Map initialPosition = null; - logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null); if (state != null) { initialTaskState = state.getTaskState(); initialReason = state.getReason(); final IndexerState existingState = state.getIndexerState(); - logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition()); if (existingState.equals(IndexerState.INDEXING)) { // reset to started as no indexer is running initialState = IndexerState.STARTED; @@ -213,6 +211,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S getIndexer().getProgress()); logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); + // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate + // This keeps track of STARTED, FAILED, STOPPED + // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that + // we could not read the previous state information from said index. persistStateToClusterState(state, ActionListener.wrap( task -> { auditor.info(transform.getId(), @@ -301,6 +303,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S taskState.set(DataFrameTransformTaskState.FAILED); stateReason.set(reason); auditor.error(transform.getId(), reason); + // Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate + // This keeps track of STARTED, FAILED, STOPPED + // This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that + // we could not read the previous state information from said index. persistStateToClusterState(getState(), ActionListener.wrap( r -> listener.onResponse(null), listener::onFailure