From d664708dd336b7e133a39f69a790aa40f087d347 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 24 May 2022 13:25:37 -0400 Subject: [PATCH] NIFI-10049: When unscheduling reporting task, increment its concurrent task count until we've finished all shutdown logic and then decrement it, in much the same way that we do for processors This closes #6076 Signed-off-by: David Handermann --- .../scheduling/StandardProcessScheduler.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index d5d136b176..4be5820069 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { } taskNode.verifyCanStop(); + + // Increment the Active Thread Count in order to ensure that we don't consider the Reporting Task completely stopped until we've run + // all lifecycle methods, such as @OnStopped + lifecycleState.incrementActiveThreadCount(null); + final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final ReportingTask reportingTask = taskNode.getReportingTask(); taskNode.setScheduledState(ScheduledState.STOPPED); @@ -304,10 +309,14 @@ public final class StandardProcessScheduler implements ProcessScheduler { agent.unschedule(taskNode, lifecycleState); - if (lifecycleState.getActiveThreadCount() == 0 && lifecycleState.mustCallOnStoppedMethods()) { + // If active thread count == 1, that indicates that all execution threads have completed. We use 1 here instead of 0 because + // when the Reporting Task is unscheduled, we immediately increment the thread count to 1 as an indicator that we've not completely finished. + if (lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext); future.complete(null); } + + lifecycleState.decrementActiveThreadCount(); } } };