[ML-DataFrame] fix starting a batch data frame after stopping at runtime (#45340) (#45381)

fix loading of next checkpoint after data frame transform has been stopped/started within one run

closes #45339
This commit is contained in:
Hendrik Muhs 2019-08-09 20:30:11 +02:00 committed by GitHub
parent 27497ff75f
commit bf4da6c6ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 21 deletions

View File

@ -151,15 +151,6 @@ public class DataFrameTransformState implements Task.Status, PersistentTaskState
return progress; return progress;
} }
/**
* Get the in-progress checkpoint
*
* @return checkpoint in progress or 0 if task/indexer is not active
*/
public long getInProgressCheckpoint() {
return indexerState.equals(IndexerState.INDEXING) ? checkpoint + 1L : 0;
}
public String getReason() { public String getReason() {
return reason; return reason;
} }

View File

@ -146,11 +146,20 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
// <5> load next checkpoint // <5> load next checkpoint
ActionListener<DataFrameTransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap( ActionListener<DataFrameTransformCheckpoint> getTransformNextCheckpointListener = ActionListener.wrap(
nextCheckpoint -> { nextCheckpoint -> {
indexerBuilder.setNextCheckpoint(nextCheckpoint);
if (nextCheckpoint.isEmpty()) {
// extra safety: reset position and progress if next checkpoint is empty
// prevents a failure if for some reason the next checkpoint has been deleted
indexerBuilder.setInitialPosition(null);
indexerBuilder.setProgress(null);
} else {
logger.trace("[{}] Loaded next checkpoint [{}] found, starting the task", transformId,
nextCheckpoint.getCheckpoint());
indexerBuilder.setNextCheckpoint(nextCheckpoint);
}
final long lastCheckpoint = stateHolder.get().getCheckpoint(); final long lastCheckpoint = stateHolder.get().getCheckpoint();
logger.trace("[{}] No next checkpoint found, starting the task", transformId);
startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener);
}, },
error -> { error -> {
@ -166,14 +175,10 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
lastCheckpoint -> { lastCheckpoint -> {
indexerBuilder.setLastCheckpoint(lastCheckpoint); indexerBuilder.setLastCheckpoint(lastCheckpoint);
final long nextCheckpoint = stateHolder.get().getInProgressCheckpoint(); logger.trace("[{}] Loaded last checkpoint [{}], looking for next checkpoint", transformId,
lastCheckpoint.getCheckpoint());
if (nextCheckpoint > 0) { transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint.getCheckpoint() + 1,
transformsConfigManager.getTransformCheckpoint(transformId, nextCheckpoint, getTransformNextCheckpointListener); getTransformNextCheckpointListener);
} else {
logger.trace("[{}] No next checkpoint found, starting the task", transformId);
startTask(buildTask, indexerBuilder, lastCheckpoint.getCheckpoint(), startTaskListener);
}
}, },
error -> { error -> {
String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId); String msg = DataFrameMessages.getMessage(DataFrameMessages.FAILED_TO_LOAD_TRANSFORM_CHECKPOINT, transformId);
@ -201,8 +206,8 @@ public class DataFrameTransformPersistentTasksExecutor extends PersistentTasksEx
final long lastCheckpoint = stateHolder.get().getCheckpoint(); final long lastCheckpoint = stateHolder.get().getCheckpoint();
if (lastCheckpoint == 0) { if (lastCheckpoint == 0) {
logger.trace("[{}] No checkpoint found, starting the task", transformId); logger.trace("[{}] No last checkpoint found, looking for next checkpoint", transformId);
startTask(buildTask, indexerBuilder, lastCheckpoint, startTaskListener); transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint + 1, getTransformNextCheckpointListener);
} else { } else {
logger.trace ("[{}] Restore last checkpoint: [{}]", transformId, lastCheckpoint); logger.trace ("[{}] Restore last checkpoint: [{}]", transformId, lastCheckpoint);
transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint, getTransformLastCheckpointListener); transformsConfigManager.getTransformCheckpoint(transformId, lastCheckpoint, getTransformLastCheckpointListener);