[ML Data Frame] Persist data frame after state changes (#42347)

This commit is contained in:
David Kyle 2019-05-22 15:35:08 +01:00
parent f696769a39
commit 075cc7c5cf
1 changed files with 10 additions and 18 deletions

View File

@ -444,7 +444,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
private final DataFrameTransformsCheckpointService transformsCheckpointService;
private final String transformId;
private final DataFrameTransformTask transformTask;
private volatile DataFrameIndexerTransformStats previouslyPersistedStats = null;
private final AtomicInteger failureCount;
// Keeps track of the last exception that was written to our audit, keeps us from spamming the audit index
private volatile String lastAuditedExceptionMessage = null;
@ -552,25 +551,18 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
// only every-so-often when doing the bulk indexing calls. See AsyncTwoPhaseIndexer#onBulkResponse for current periodicity
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> updateClusterStateListener = ActionListener.wrap(
task -> {
// Only persist the stats if something has actually changed
if (previouslyPersistedStats == null || previouslyPersistedStats.equals(getStats()) == false) {
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
transformsConfigManager.putOrUpdateTransformStats(
new DataFrameTransformStateAndStats(transformId, state, getStats(),
DataFrameTransformCheckpointingInfo.EMPTY), // TODO should this be null
ActionListener.wrap(
r -> {
previouslyPersistedStats = getStats();
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
r -> {
next.run();
},
statsExc -> {
logger.error("Updating stats of transform [" + transformConfig.getId() + "] failed", statsExc);
next.run();
}
));
// The stats that we have previously written to the doc is the same as as it is now, no need to update it
} else {
next.run();
}
},
exc -> {
logger.error("Updating persistent state of transform [" + transformConfig.getId() + "] failed", exc);