parent
8711a092bf
commit
f13f55ede3
|
@ -490,8 +490,6 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void onStart(long now, ActionListener<Void> listener) {
|
protected void onStart(long now, ActionListener<Void> listener) {
|
||||||
// Reset our failure count as we are starting again
|
|
||||||
failureCount.set(0);
|
|
||||||
// On each run, we need to get the total number of docs and reset the count of processed docs
|
// On each run, we need to get the total number of docs and reset the count of processed docs
|
||||||
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
|
// Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather
|
||||||
// the progress here, and not in the executor.
|
// the progress here, and not in the executor.
|
||||||
|
@ -630,6 +628,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
try {
|
try {
|
||||||
super.onFinish(listener);
|
super.onFinish(listener);
|
||||||
long checkpoint = transformTask.currentCheckpoint.incrementAndGet();
|
long checkpoint = transformTask.currentCheckpoint.incrementAndGet();
|
||||||
|
// Reset our failure count as we have finished and may start again with a new checkpoint
|
||||||
|
failureCount.set(0);
|
||||||
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
|
auditor.info(transformTask.getTransformId(), "Finished indexing for data frame transform checkpoint [" + checkpoint + "].");
|
||||||
logger.info(
|
logger.info(
|
||||||
"Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
|
"Finished indexing for data frame transform [" + transformTask.getTransformId() + "] checkpoint [" + checkpoint + "]");
|
||||||
|
|
Loading…
Reference in New Issue