NIFI-54: When incrementing active thread count, if the value exceeds max, do not run

This commit is contained in:
Mark Payne 2014-12-10 11:57:33 -05:00
parent 8254b75437
commit 97f8ab0cc5
2 changed files with 26 additions and 6 deletions

View File

@ -263,7 +263,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
} }
private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) {
scheduleState.incrementActiveThreadCount(); final int newThreadCount = scheduleState.incrementActiveThreadCount();
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
// its possible that the worker queue could give us a worker node that is eligible to run based
// on the number of threads but another thread has already incremented the thread count, result in
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined threads
scheduleState.decrementActiveThreadCount();
return;
}
try { try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
worker.onTrigger(processContext, sessionFactory); worker.onTrigger(processContext, sessionFactory);
@ -293,7 +303,17 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent {
} }
private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) { private void trigger(final ProcessorNode worker, final ProcessContext context, final ScheduleState scheduleState, final StandardProcessContext processContext, final ProcessSessionFactory sessionFactory) {
scheduleState.incrementActiveThreadCount(); final int newThreadCount = scheduleState.incrementActiveThreadCount();
if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) {
// its possible that the worker queue could give us a worker node that is eligible to run based
// on the number of threads but another thread has already incremented the thread count, result in
// reaching the maximum number of threads. we won't know this until we atomically increment the thread count
// on the Schedule State, so we check it here. in this case, we cannot trigger the Processor, as doing so would
// result in using more than the maximum number of defined threads
scheduleState.decrementActiveThreadCount();
return;
}
try { try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
worker.onTrigger(processContext, sessionFactory); worker.onTrigger(processContext, sessionFactory);

View File

@ -31,12 +31,12 @@ public class ScheduleState {
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1; private volatile long lastStopTime = -1;
public void incrementActiveThreadCount() { public int incrementActiveThreadCount() {
activeThreadCount.incrementAndGet(); return activeThreadCount.incrementAndGet();
} }
public void decrementActiveThreadCount() { public int decrementActiveThreadCount() {
activeThreadCount.decrementAndGet(); return activeThreadCount.decrementAndGet();
} }
public int getActiveThreadCount() { public int getActiveThreadCount() {