diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 9c181ffaf1..5d65f89b88 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -280,6 +280,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R private final AtomicInteger maxEventDrivenThreads; private final AtomicReference timerDrivenEngineRef; private final AtomicReference eventDrivenEngineRef; + private final EventDrivenSchedulingAgent eventDrivenSchedulingAgent; private final ContentRepository contentRepository; private final FlowFileRepository flowFileRepository; @@ -502,8 +503,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler); final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository); - processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent( - eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor)); + + eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent( + eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor); + processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent); final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor); final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.nifiProperties); @@ -3582,7 +3585,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R public int getActiveThreadCount() { final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount(); - final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount(); + final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount(); return timerDrivenCount + eventDrivenCount; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index 58d25bb2fb..7b2e9662cd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -60,6 +60,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private final EventDrivenWorkerQueue workerQueue; private final ProcessContextFactory contextFactory; private final AtomicInteger maxThreadCount; + private final AtomicInteger activeThreadCount = new AtomicInteger(0); private final StringEncryptor encryptor; private volatile String adminYieldDuration = "1 sec"; @@ -78,11 +79,15 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { this.encryptor = encryptor; for (int i = 0; i < maxThreadCount; i++) { - final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); + final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount); flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS); } } + public int getActiveThreadCount() { + return activeThreadCount.get(); + } + private StateManager getStateManager(final String componentId) { return stateManagerProvider.getStateManager(componentId); } @@ -127,7 +132,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { // if more threads have been allocated, add more tasks to the work queue final int tasksToAdd = maxThreadCount - oldMax; for (int i = 0; i < tasksToAdd; i++) { - final Runnable eventDrivenTask = new EventDrivenTask(workerQueue); + final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount); flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS); } } @@ -151,9 +156,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { private class EventDrivenTask implements Runnable { private final EventDrivenWorkerQueue workerQueue; + private final AtomicInteger activeThreadCount; - public EventDrivenTask(final EventDrivenWorkerQueue workerQueue) { + public EventDrivenTask(final EventDrivenWorkerQueue workerQueue, final AtomicInteger activeThreadCount) { this.workerQueue = workerQueue; + this.activeThreadCount = activeThreadCount; } @Override @@ -170,101 +177,106 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent { continue; } - // get the connection index for this worker - AtomicLong connectionIndex = connectionIndexMap.get(connectable); - if (connectionIndex == null) { - connectionIndex = new AtomicLong(0L); - final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex); - if (existingConnectionIndex != null) { - connectionIndex = existingConnectionIndex; - } - } - - final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex); - - if (connectable instanceof ProcessorNode) { - final ProcessorNode procNode = (ProcessorNode) connectable; - final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier())); - - final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); - final ProcessSessionFactory sessionFactory; - final StandardProcessSession rawSession; - final boolean batch; - if (procNode.isHighThroughputSupported() && runNanos > 0L) { - rawSession = new StandardProcessSession(context); - sessionFactory = new BatchingSessionFactory(rawSession); - batch = true; - } else { - rawSession = null; - sessionFactory = new StandardProcessSessionFactory(context); - batch = false; + activeThreadCount.incrementAndGet(); + try { + // get the connection index for this worker + AtomicLong connectionIndex = connectionIndexMap.get(connectable); + if (connectionIndex == null) { + connectionIndex = new AtomicLong(0L); + final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex); + if (existingConnectionIndex != null) { + connectionIndex = existingConnectionIndex; + } } - final long startNanos = System.nanoTime(); - final long finishNanos = startNanos + runNanos; - int invocationCount = 0; - boolean shouldRun = true; + final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex); - try { - while (shouldRun) { - trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory); - invocationCount++; + if (connectable instanceof ProcessorNode) { + final ProcessorNode procNode = (ProcessorNode) connectable; + final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier())); - if (!batch) { - break; - } - if (System.nanoTime() > finishNanos) { - break; - } - if (!scheduleState.isScheduled()) { - break; - } - - final int eventCount = worker.decrementEventCount(); - if (eventCount < 0) { - worker.incrementEventCount(); - } - shouldRun = (eventCount > 0); - } - } finally { - if (batch && rawSession != null) { - try { - rawSession.commit(); - } catch (final RuntimeException re) { - logger.error("Unable to commit process session", re); - } + final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); + final ProcessSessionFactory sessionFactory; + final StandardProcessSession rawSession; + final boolean batch; + if (procNode.isHighThroughputSupported() && runNanos > 0L) { + rawSession = new StandardProcessSession(context); + sessionFactory = new BatchingSessionFactory(rawSession); + batch = true; + } else { + rawSession = null; + sessionFactory = new StandardProcessSessionFactory(context); + batch = false; } + + final long startNanos = System.nanoTime(); + final long finishNanos = startNanos + runNanos; + int invocationCount = 0; + boolean shouldRun = true; + try { - final long processingNanos = System.nanoTime() - startNanos; - final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); - procEvent.setProcessingNanos(processingNanos); - procEvent.setInvocations(invocationCount); - context.getFlowFileEventRepository().updateRepository(procEvent); - } catch (final IOException e) { - logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString()); - logger.error("", e); + while (shouldRun) { + trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory); + invocationCount++; + + if (!batch) { + break; + } + if (System.nanoTime() > finishNanos) { + break; + } + if (!scheduleState.isScheduled()) { + break; + } + + final int eventCount = worker.decrementEventCount(); + if (eventCount < 0) { + worker.incrementEventCount(); + } + shouldRun = (eventCount > 0); + } + } finally { + if (batch && rawSession != null) { + try { + rawSession.commit(); + } catch (final RuntimeException re) { + logger.error("Unable to commit process session", re); + } + } + try { + final long processingNanos = System.nanoTime() - startNanos; + final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier()); + procEvent.setProcessingNanos(processingNanos); + procEvent.setInvocations(invocationCount); + context.getFlowFileEventRepository().updateRepository(procEvent); + } catch (final IOException e) { + logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString()); + logger.error("", e); + } + } + + // If the Processor has FlowFiles, go ahead and register it to run again. + // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine, + // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything + // off of its input queue. + // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and + // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this + // point is to register the Processor to run again. + if (Connectables.flowFilesQueued(procNode)) { + onEvent(procNode); + } + } else { + final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context); + final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); + trigger(connectable, scheduleState, connectableProcessContext, sessionFactory); + + // See explanation above for the ProcessorNode as to why we do this. + if (Connectables.flowFilesQueued(connectable)) { + onEvent(connectable); } } - - // If the Processor has FlowFiles, go ahead and register it to run again. - // We do this because it's possible (and fairly common) for a Processor to be triggered and then determine, - // for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything - // off of its input queue. - // In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and - // confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this - // point is to register the Processor to run again. - if (Connectables.flowFilesQueued(procNode)) { - onEvent(procNode); - } - } else { - final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context); - final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier())); - trigger(connectable, scheduleState, connectableProcessContext, sessionFactory); - - // See explanation above for the ProcessorNode as to why we do this. - if (Connectables.flowFilesQueued(connectable)) { - onEvent(connectable); - } + } finally { + activeThreadCount.decrementAndGet(); } } }