NIFI-10049: When unscheduling reporting task, increment its concurrent task count until we've finished all shutdown logic and then decrement it, in much the same way that we do for processors

This closes #6076

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Payne 2022-05-24 13:25:37 -04:00 committed by exceptionfactory
parent 205c58230b
commit d664708dd3
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 10 additions and 1 deletions

View File

@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
taskNode.verifyCanStop();
// Increment the Active Thread Count in order to ensure that we don't consider the Reporting Task completely stopped until we've run
// all lifecycle methods, such as @OnStopped
lifecycleState.incrementActiveThreadCount(null);
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask();
taskNode.setScheduledState(ScheduledState.STOPPED);
@ -304,10 +309,14 @@ public final class StandardProcessScheduler implements ProcessScheduler {
agent.unschedule(taskNode, lifecycleState);
if (lifecycleState.getActiveThreadCount() == 0 && lifecycleState.mustCallOnStoppedMethods()) {
// If active thread count == 1, that indicates that all execution threads have completed. We use 1 here instead of 0 because
// when the Reporting Task is unscheduled, we immediately increment the thread count to 1 as an indicator that we've not completely finished.
if (lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
future.complete(null);
}
lifecycleState.decrementActiveThreadCount();
}
}
};