NIFI-381: Ensure that we always properly account for number of active threads

This commit is contained in:
Mark Payne 2015-02-25 14:07:21 -05:00
parent ca23ad8eaa
commit 1af8c1e22a
1 changed files with 26 additions and 24 deletions

View File

@ -159,31 +159,33 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
}
} finally {
if (batch) {
rawSession.commit();
}
final long processingNanos = System.nanoTime() - startNanos;
// if the processor is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
flowController.heartbeat();
}
}
scheduleState.decrementActiveThreadCount();
try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
context.getFlowFileEventRepository().updateRepository(procEvent);
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
logger.error("", e);
if (batch) {
rawSession.commit();
}
final long processingNanos = System.nanoTime() - startNanos;
// if the processor is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
flowController.heartbeat();
}
}
try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
context.getFlowFileEventRepository().updateRepository(procEvent);
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
logger.error("", e);
}
} finally {
scheduleState.decrementActiveThreadCount();
}
}