From e01d59a462e3047ccd8bbfdc608631bfd90db533 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 13 Sep 2017 11:01:07 -0400 Subject: [PATCH] NIFI-4379: Event-Driven threads are constantly active, polling a queue to see if there is any work to do. Instead of getting number of active threads from the ScheduledExecutor for these threads, updated code to use an AtomicInteger that is incremented when there's work to do and decremented once complete Signed-off-by: Pierre Villard This closes #2153. --- .../nifi/controller/FlowController.java | 9 +- .../EventDrivenSchedulingAgent.java | 192 ++++++++++-------- 2 files changed, 108 insertions(+), 93 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 9c181ffaf1..5d65f89b88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -280,6 +280,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final AtomicInteger maxEventDrivenThreads; private final AtomicReference timerDrivenEngineRef; private final AtomicReference eventDrivenEngineRef; + private final EventDrivenSchedulingAgent eventDrivenSchedulingAgent; private final ContentRepository contentRepository; private final FlowFileRepository flowFileRepository; @@ -502,8 +503,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); - processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); + + eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent( + eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor); + processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.nifiProperties); @@ -3582,7 +3585,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public int getActiveThreadCount() { final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount(); - final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount(); + final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount(); return timerDrivenCount + eventDrivenCount; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 58d25bb2fb..7b2e9662cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -60,6 +60,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private final EventDrivenWorkerQueue workerQueue; private final ProcessContextFactory contextFactory; private final AtomicInteger maxThreadCount; + private final AtomicInteger activeThreadCount = new AtomicInteger(0); private final StringEncryptor encryptor; private volatile String adminYieldDuration = "1 sec"; @@ -78,11 +79,15 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { this.encryptor = encryptor; for (int i = 0; i < maxThreadCount; i++) { - final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); + final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount); flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS); } } + public int getActiveThreadCount() { + return activeThreadCount.get(); + } + private StateManager getStateManager(final String componentId) { return stateManagerProvider.getStateManager(componentId); } @@ -127,7 +132,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { // if more threads have been allocated, add more tasks to the work queue final int tasksToAdd = maxThreadCount - oldMax; for (int i = 0; i < tasksToAdd; i++) { - final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); + final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount); flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS); } } @@ -151,9 +156,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private class EventDrivenTask implements Runnable { private final EventDrivenWorkerQueue workerQueue; + private final AtomicInteger activeThreadCount; - public EventDrivenTask(final EventDrivenWorkerQueue workerQueue) { + public EventDrivenTask(final EventDrivenWorkerQueue workerQueue, final AtomicInteger activeThreadCount) { this.workerQueue = workerQueue; + this.activeThreadCount = activeThreadCount; } @Override @@ -170,101 +177,106 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { continue; } - // get the connection index for this worker - AtomicLong connectionIndex = connectionIndexMap.get(connectable); - if (connectionIndex == null) { - connectionIndex = new AtomicLong(0L); - final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex); - if (existingConnectionIndex != null) { - connectionIndex = existingConnectionIndex; - } - } - - final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex); - - if (connectable instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier())); - - final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); - final ProcessSessionFactory sessionFactory; - final StandardProcessSession rawSession; - final boolean batch; - if (procNode.isHighThroughputSupported() && runNanos > 0L) { - rawSession = new StandardProcessSession(context); - sessionFactory = new BatchingSessionFactory(rawSession); - batch = true; - } else { - rawSession = null; - sessionFactory = new StandardProcessSessionFactory(context); - batch = false; + activeThreadCount.incrementAndGet(); + try { + // get the connection index for this worker + AtomicLong connectionIndex = connectionIndexMap.get(connectable); + if (connectionIndex == null) { + connectionIndex = new AtomicLong(0L); + final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex); + if (existingConnectionIndex != null) { + connectionIndex = existingConnectionIndex; + } } - final long startNanos = System.nanoTime(); - final long finishNanos = startNanos + runNanos; - int invocationCount = 0; - boolean shouldRun = true; + final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex); - try { - while (shouldRun) { - trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory); - invocationCount++; + if (connectable instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) connectable; + final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier())); - if (!batch) { - break; - } - if (System.nanoTime() > finishNanos) { - break; - } - if (!scheduleState.isScheduled()) { - break; - } - - final int eventCount = worker.decrementEventCount(); - if (eventCount < 0) { - worker.incrementEventCount(); - } - shouldRun = (eventCount > 0); - } - } finally { - if (batch && rawSession != null) { - try { - rawSession.commit(); - } catch (final RuntimeException re) { - logger.error("Unable to commit process session", re); - } + final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); + final ProcessSessionFactory sessionFactory; + final StandardProcessSession rawSession; + final boolean batch; + if (procNode.isHighThroughputSupported() && runNanos > 0L) { + rawSession = new StandardProcessSession(context); + sessionFactory = new BatchingSessionFactory(rawSession); + batch = true; + } else { + rawSession = null; + sessionFactory = new StandardProcessSessionFactory(context); + batch = false; } + + final long startNanos = System.nanoTime(); + final long finishNanos = startNanos + runNanos; + int invocationCount = 0; + boolean shouldRun = true; + try { - final long processingNanos = System.nanoTime() - startNanos; - final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); - procEvent.setProcessingNanos(processingNanos); - procEvent.setInvocations(invocationCount); - context.getFlowFileEventRepository().updateRepository(procEvent); - } catch (final IOException e) { - logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString()); - logger.error("", e); + while (shouldRun) { + trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory); + invocationCount++; + + if (!batch) { + break; + } + if (System.nanoTime() > finishNanos) { + break; + } + if (!scheduleState.isScheduled()) { + break; + } + + final int eventCount = worker.decrementEventCount(); + if (eventCount < 0) { + worker.incrementEventCount(); + } + shouldRun = (eventCount > 0); + } + } finally { + if (batch && rawSession != null) { + try { + rawSession.commit(); + } catch (final RuntimeException re) { + logger.error("Unable to commit process session", re); + } + } + try { + final long processingNanos = System.nanoTime() - startNanos; + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + procEvent.setProcessingNanos(processingNanos); + procEvent.setInvocations(invocationCount); + context.getFlowFileEventRepository().updateRepository(procEvent); + } catch (final IOException e) { + logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString()); + logger.error("", e); + } + } + + // If the Processor has FlowFiles, go ahead and register it to run again. + // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine, + // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything + // off of its input queue. + // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and + // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this + // point is to register the Processor to run again. + if (Connectables.flowFilesQueued(procNode)) { + onEvent(procNode); + } + } else { + final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context); + final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); + trigger(connectable, scheduleState, connectableProcessContext, sessionFactory); + + // See explanation above for the ProcessorNode as to why we do this. + if (Connectables.flowFilesQueued(connectable)) { + onEvent(connectable); } } - - // If the Processor has FlowFiles, go ahead and register it to run again. - // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine, - // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything - // off of its input queue. - // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and - // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this - // point is to register the Processor to run again. - if (Connectables.flowFilesQueued(procNode)) { - onEvent(procNode); - } - } else { - final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context); - final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); - trigger(connectable, scheduleState, connectableProcessContext, sessionFactory); - - // See explanation above for the ProcessorNode as to why we do this. - if (Connectables.flowFilesQueued(connectable)) { - onEvent(connectable); - } + } finally { + activeThreadCount.decrementAndGet(); } } }