diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 5b237ff72e..af801bbb8c 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -263,7 +263,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { - scheduleState.incrementActiveThreadCount(); + final int newThreadCount = scheduleState.incrementActiveThreadCount(); + if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; + } + try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { worker.onTrigger(processContext, sessionFactory); @@ -293,7 +303,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) { - scheduleState.incrementActiveThreadCount(); + final int newThreadCount = scheduleState.incrementActiveThreadCount(); + if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { + // its possible that the worker queue could give us a worker node that is eligible to run based + // on the number of threads but another thread has already incremented the thread count, result in + // reaching the maximum number of threads. we won't know this until we atomically increment the thread count + // on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would + // result in using more than the maximum number of defined threads + scheduleState.decrementActiveThreadCount(); + return; + } + try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { worker.onTrigger(processContext, sessionFactory); diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java index c10de83ba0..eb5a437b71 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/ScheduleState.java @@ -31,12 +31,12 @@ public class ScheduleState { private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private volatile long lastStopTime = -1; - public void incrementActiveThreadCount() { - activeThreadCount.incrementAndGet(); + public int incrementActiveThreadCount() { + return activeThreadCount.incrementAndGet(); } - public void decrementActiveThreadCount() { - activeThreadCount.decrementAndGet(); + public int decrementActiveThreadCount() { + return activeThreadCount.decrementAndGet(); } public int getActiveThreadCount() {