From d4d4ddadee8b7fcba17ea25b008fa75185336efc Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 29 Jun 2018 09:25:57 -0400 Subject: [PATCH] NIFI-5361: When submitting many processors to start, calculate the 'timeout timestamp' immediately before calling @OnScheduled method, after the task has been scheduled to run, instead of before the task has a chance to run. This closes #2831 --- .../org/apache/nifi/controller/StandardProcessorNode.java | 8 ++++++-- .../controller/scheduling/StandardProcessScheduler.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 6a831d00a2..c2b98e6458 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -1482,12 +1482,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable final Processor processor = getProcessor(); final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor); - final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis; + // Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run. + final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE); // Create a task to invoke the @OnScheduled annotation of the processor final Callable startupTask = () -> { LOG.debug("Invoking @OnScheduled methods of {}", processor); + // Now that the task has been scheduled, set the timeout + completionTimestampRef.set(System.currentTimeMillis() + onScheduleTimeoutMillis); + try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) { try { activateThread(); @@ -1572,7 +1576,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable return; } - monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp); + monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get()); } }; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 8f7eb2fdb0..b23e76356e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -319,7 +319,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public Future scheduleTask(final Callable task) { lifecycleState.incrementActiveThreadCount(null); - return componentMonitoringThreadPool.submit(task); + return componentLifeCycleThreadPool.submit(task); } @Override