[ML] Correct indexer state on task re-allocation (#41724) (#41751)

This commit is contained in:
Benjamin Trent 2019-05-02 12:01:59 -05:00 committed by GitHub
parent ab9154005b
commit 33b4032fab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 1 deletions

View File

@ -114,7 +114,7 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
new DataFrameTransformTask.ClientDataFrameIndexerBuilder()
.setAuditor(auditor)
.setClient(client)
.setIndexerState(transformState == null ? IndexerState.STOPPED : transformState.getIndexerState())
.setIndexerState(currentIndexerState(transformState))
.setInitialPosition(transformState == null ? null : transformState.getPosition())
// If the state is `null` that means this is a "first run". We can safely assume the
// task will attempt to gather the initial progress information
@ -184,6 +184,26 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
transformsConfigManager.getTransformConfiguration(transformId, getTransformConfigListener);
}
private static IndexerState currentIndexerState(DataFrameTransformState previousState) {
if (previousState == null) {
return IndexerState.STOPPED;
}
switch(previousState.getIndexerState()){
// If it is STARTED or INDEXING we want to make sure we revert to started
// Otherwise, the internal indexer will never get scheduled and execute
case STARTED:
case INDEXING:
return IndexerState.STARTED;
// If we are STOPPED, STOPPING, or ABORTING and just started executing on this node,
// then it is safe to say we should be STOPPED
case STOPPED:
case STOPPING:
case ABORTING:
default:
return IndexerState.STOPPED;
}
}
private void markAsFailed(DataFrameTransformTask task, String reason) {
CountDownLatch latch = new CountDownLatch(1);