[ML][Data Frame] pull state and states for indexer from index (#42856) (#42935)

* [ML][Data Frame] pull state and states for indexer from index

* Update DataFrameTransformTask.java
This commit is contained in:
Benjamin Trent 2019-06-06 08:51:11 -05:00 committed by GitHub
parent 02e6acf2d2
commit 1f1868a294
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 15 deletions

View File

@ -111,12 +111,6 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId) new DataFrameTransformTask.ClientDataFrameIndexerBuilder(transformId)
.setAuditor(auditor) .setAuditor(auditor)
.setClient(client) .setClient(client)
.setIndexerState(currentIndexerState(transformPTaskState))
// If the transform persistent task state is `null` that means this is a "first run".
// If we have state then the task has relocated from another node in which case this
// state is preferred
.setInitialPosition(transformPTaskState == null ? null : transformPTaskState.getPosition())
.setProgress(transformPTaskState == null ? null : transformPTaskState.getProgress())
.setTransformsCheckpointService(dataFrameTransformsCheckpointService) .setTransformsCheckpointService(dataFrameTransformsCheckpointService)
.setTransformsConfigManager(transformsConfigManager); .setTransformsConfigManager(transformsConfigManager);
@ -132,18 +126,22 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
// Schedule execution regardless // Schedule execution regardless
ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap( ActionListener<DataFrameTransformStateAndStats> transformStatsActionListener = ActionListener.wrap(
stateAndStats -> { stateAndStats -> {
indexerBuilder.setInitialStats(stateAndStats.getTransformStats()); logger.trace("[{}] initializing state and stats: [{}]", transformId, stateAndStats.toString());
if (transformPTaskState == null) { // prefer the persistent task state indexerBuilder.setInitialStats(stateAndStats.getTransformStats())
indexerBuilder.setInitialPosition(stateAndStats.getTransformState().getPosition()); .setInitialPosition(stateAndStats.getTransformState().getPosition())
indexerBuilder.setProgress(stateAndStats.getTransformState().getProgress()); .setProgress(stateAndStats.getTransformState().getProgress())
} .setIndexerState(currentIndexerState(stateAndStats.getTransformState()));
logger.info("[{}] Loading existing state: [{}], position [{}]",
transformId,
stateAndStats.getTransformState(),
stateAndStats.getTransformState().getPosition());
final Long checkpoint = previousCheckpoint != null ? previousCheckpoint : stateAndStats.getTransformState().getCheckpoint(); final Long checkpoint = stateAndStats.getTransformState().getCheckpoint();
startTask(buildTask, indexerBuilder, checkpoint, startTaskListener); startTask(buildTask, indexerBuilder, checkpoint, startTaskListener);
}, },
error -> { error -> {
if (error instanceof ResourceNotFoundException == false) { if (error instanceof ResourceNotFoundException == false) {
logger.error("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error); logger.warn("Unable to load previously persisted statistics for transform [" + params.getId() + "]", error);
} }
startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener); startTask(buildTask, indexerBuilder, previousCheckpoint, startTaskListener);
} }

View File

@ -89,12 +89,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
String initialReason = null; String initialReason = null;
long initialGeneration = 0; long initialGeneration = 0;
Map<String, Object> initialPosition = null; Map<String, Object> initialPosition = null;
logger.trace("[{}] init, got state: [{}]", transform.getId(), state != null);
if (state != null) { if (state != null) {
initialTaskState = state.getTaskState(); initialTaskState = state.getTaskState();
initialReason = state.getReason(); initialReason = state.getReason();
final IndexerState existingState = state.getIndexerState(); final IndexerState existingState = state.getIndexerState();
logger.info("[{}] Loading existing state: [{}], position [{}]", transform.getId(), existingState, state.getPosition());
if (existingState.equals(IndexerState.INDEXING)) { if (existingState.equals(IndexerState.INDEXING)) {
// reset to started as no indexer is running // reset to started as no indexer is running
initialState = IndexerState.STARTED; initialState = IndexerState.STARTED;
@ -213,6 +211,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
getIndexer().getProgress()); getIndexer().getProgress());
logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString()); logger.info("Updating state for data frame transform [{}] to [{}]", transform.getId(), state.toString());
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
// we could not read the previous state information from said index.
persistStateToClusterState(state, ActionListener.wrap( persistStateToClusterState(state, ActionListener.wrap(
task -> { task -> {
auditor.info(transform.getId(), auditor.info(transform.getId(),
@ -301,6 +303,10 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
taskState.set(DataFrameTransformTaskState.FAILED); taskState.set(DataFrameTransformTaskState.FAILED);
stateReason.set(reason); stateReason.set(reason);
auditor.error(transform.getId(), reason); auditor.error(transform.getId(), reason);
// Even though the indexer information is persisted to an index, we still need DataFrameTransformTaskState in the clusterstate
// This keeps track of STARTED, FAILED, STOPPED
// This is because a FAILED state can occur because we cannot read the config from the internal index, which would imply that
// we could not read the previous state information from said index.
persistStateToClusterState(getState(), ActionListener.wrap( persistStateToClusterState(getState(), ActionListener.wrap(
r -> listener.onResponse(null), r -> listener.onResponse(null),
listener::onFailure listener::onFailure