NIFI-5361: When submitting many processors to start, calculate the 'timeout timestamp' immediately before calling @OnScheduled method, after the task has been scheduled to run, instead of before the task has a chance to run.

This closes #2831
This commit is contained in:
Mark Payne 2018-06-29 09:25:57 -04:00 committed by Matt Gilman
parent 790102d8b1
commit d4d4ddadee
No known key found for this signature in database
GPG Key ID: DF61EC19432AEE37
2 changed files with 7 additions and 3 deletions

View File

@ -1482,12 +1482,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
final Processor processor = getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
final long completionTimestamp = System.currentTimeMillis() + onScheduleTimeoutMillis;
// Completion Timestamp is set to MAX_VALUE because we don't want to timeout until the task has a chance to run.
final AtomicLong completionTimestampRef = new AtomicLong(Long.MAX_VALUE);
// Create a task to invoke the @OnScheduled annotation of the processor
final Callable<Void> startupTask = () -> {
LOG.debug("Invoking @OnScheduled methods of {}", processor);
// Now that the task has been scheduled, set the timeout
completionTimestampRef.set(System.currentTimeMillis() + onScheduleTimeoutMillis);
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(processor.getClass(), processor.getIdentifier())) {
try {
activateThread();
@ -1572,7 +1576,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
return;
}
monitorAsyncTask(taskFuture, monitoringFuture, completionTimestamp);
monitorAsyncTask(taskFuture, monitoringFuture, completionTimestampRef.get());
}
};

View File

@ -319,7 +319,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public Future<?> scheduleTask(final Callable<?> task) {
lifecycleState.incrementActiveThreadCount(null);
return componentMonitoringThreadPool.submit(task);
return componentLifeCycleThreadPool.submit(task);
}
@Override