NIFI-4379: Event-Driven threads are constantly active, polling a queue to see if there is any work to do. Instead of getting number of active threads from the ScheduledExecutor for these threads, updated code to use an AtomicInteger that is incremented when there's work to do and decremented once complete

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2153.
This commit is contained in:
Mark Payne 2017-09-13 11:01:07 -04:00 committed by Pierre Villard
parent 1e70e24267
commit e01d59a462
2 changed files with 108 additions and 93 deletions

View File

@ -280,6 +280,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final AtomicInteger maxEventDrivenThreads;
private final AtomicReference<FlowEngine> timerDrivenEngineRef;
private final AtomicReference<FlowEngine> eventDrivenEngineRef;
private final EventDrivenSchedulingAgent eventDrivenSchedulingAgent;
private final ContentRepository contentRepository;
private final FlowFileRepository flowFileRepository;
@ -502,8 +503,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor));
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), this, stateManagerProvider, eventDrivenWorkerQueue, contextFactory, maxEventDrivenThreads.get(), encryptor);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), contextFactory, encryptor, this.nifiProperties);
@ -3582,7 +3585,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public int getActiveThreadCount() {
final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
final int eventDrivenCount = eventDrivenEngineRef.get().getActiveCount();
final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount();
return timerDrivenCount + eventDrivenCount;
}

View File

@ -60,6 +60,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private final EventDrivenWorkerQueue workerQueue;
private final ProcessContextFactory contextFactory;
private final AtomicInteger maxThreadCount;
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final StringEncryptor encryptor;
private volatile String adminYieldDuration = "1 sec";
@ -78,11 +79,15 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
this.encryptor = encryptor;
for (int i = 0; i < maxThreadCount; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
}
}
public int getActiveThreadCount() {
return activeThreadCount.get();
}
private StateManager getStateManager(final String componentId) {
return stateManagerProvider.getStateManager(componentId);
}
@ -127,7 +132,7 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
// if more threads have been allocated, add more tasks to the work queue
final int tasksToAdd = maxThreadCount - oldMax;
for (int i = 0; i < tasksToAdd; i++) {
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
final Runnable eventDrivenTask = new EventDrivenTask(workerQueue, activeThreadCount);
flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, TimeUnit.NANOSECONDS);
}
}
@ -151,9 +156,11 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
private class EventDrivenTask implements Runnable {
private final EventDrivenWorkerQueue workerQueue;
private final AtomicInteger activeThreadCount;
public EventDrivenTask(final EventDrivenWorkerQueue workerQueue) {
public EventDrivenTask(final EventDrivenWorkerQueue workerQueue, final AtomicInteger activeThreadCount) {
this.workerQueue = workerQueue;
this.activeThreadCount = activeThreadCount;
}
@Override
@ -170,101 +177,106 @@ public class EventDrivenSchedulingAgent extends AbstractSchedulingAgent {
continue;
}
// get the connection index for this worker
AtomicLong connectionIndex = connectionIndexMap.get(connectable);
if (connectionIndex == null) {
connectionIndex = new AtomicLong(0L);
final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex);
if (existingConnectionIndex != null) {
connectionIndex = existingConnectionIndex;
}
}
final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex);
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier()));
final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
final StandardProcessSession rawSession;
final boolean batch;
if (procNode.isHighThroughputSupported() && runNanos > 0L) {
rawSession = new StandardProcessSession(context);
sessionFactory = new BatchingSessionFactory(rawSession);
batch = true;
} else {
rawSession = null;
sessionFactory = new StandardProcessSessionFactory(context);
batch = false;
activeThreadCount.incrementAndGet();
try {
// get the connection index for this worker
AtomicLong connectionIndex = connectionIndexMap.get(connectable);
if (connectionIndex == null) {
connectionIndex = new AtomicLong(0L);
final AtomicLong existingConnectionIndex = connectionIndexMap.putIfAbsent(connectable, connectionIndex);
if (existingConnectionIndex != null) {
connectionIndex = existingConnectionIndex;
}
}
final long startNanos = System.nanoTime();
final long finishNanos = startNanos + runNanos;
int invocationCount = 0;
boolean shouldRun = true;
final ProcessContext context = contextFactory.newProcessContext(connectable, connectionIndex);
try {
while (shouldRun) {
trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory);
invocationCount++;
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StandardProcessContext standardProcessContext = new StandardProcessContext(procNode, serviceProvider, encryptor, getStateManager(connectable.getIdentifier()));
if (!batch) {
break;
}
if (System.nanoTime() > finishNanos) {
break;
}
if (!scheduleState.isScheduled()) {
break;
}
final int eventCount = worker.decrementEventCount();
if (eventCount < 0) {
worker.incrementEventCount();
}
shouldRun = (eventCount > 0);
}
} finally {
if (batch && rawSession != null) {
try {
rawSession.commit();
} catch (final RuntimeException re) {
logger.error("Unable to commit process session", re);
}
final long runNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
final ProcessSessionFactory sessionFactory;
final StandardProcessSession rawSession;
final boolean batch;
if (procNode.isHighThroughputSupported() && runNanos > 0L) {
rawSession = new StandardProcessSession(context);
sessionFactory = new BatchingSessionFactory(rawSession);
batch = true;
} else {
rawSession = null;
sessionFactory = new StandardProcessSessionFactory(context);
batch = false;
}
final long startNanos = System.nanoTime();
final long finishNanos = startNanos + runNanos;
int invocationCount = 0;
boolean shouldRun = true;
try {
final long processingNanos = System.nanoTime() - startNanos;
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
context.getFlowFileEventRepository().updateRepository(procEvent);
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
logger.error("", e);
while (shouldRun) {
trigger(procNode, context, scheduleState, standardProcessContext, sessionFactory);
invocationCount++;
if (!batch) {
break;
}
if (System.nanoTime() > finishNanos) {
break;
}
if (!scheduleState.isScheduled()) {
break;
}
final int eventCount = worker.decrementEventCount();
if (eventCount < 0) {
worker.incrementEventCount();
}
shouldRun = (eventCount > 0);
}
} finally {
if (batch && rawSession != null) {
try {
rawSession.commit();
} catch (final RuntimeException re) {
logger.error("Unable to commit process session", re);
}
}
try {
final long processingNanos = System.nanoTime() - startNanos;
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(connectable.getIdentifier());
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocationCount);
context.getFlowFileEventRepository().updateRepository(procEvent);
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable, e.toString());
logger.error("", e);
}
}
// If the Processor has FlowFiles, go ahead and register it to run again.
// We do this because it's possible (and fairly common) for a Processor to be triggered and then determine,
// for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything
// off of its input queue.
// In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and
// confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this
// point is to register the Processor to run again.
if (Connectables.flowFilesQueued(procNode)) {
onEvent(procNode);
}
} else {
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context);
final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
trigger(connectable, scheduleState, connectableProcessContext, sessionFactory);
// See explanation above for the ProcessorNode as to why we do this.
if (Connectables.flowFilesQueued(connectable)) {
onEvent(connectable);
}
}
// If the Processor has FlowFiles, go ahead and register it to run again.
// We do this because it's possible (and fairly common) for a Processor to be triggered and then determine,
// for whatever reason, that it is not ready to do anything and as a result simply returns without pulling anything
// off of its input queue.
// In this case, we could just say that the Processor shouldn't be Event-Driven, but then it becomes very complex and
// confusing to determine whether or not a Processor is really Event-Driven. So, the solution that we will use at this
// point is to register the Processor to run again.
if (Connectables.flowFilesQueued(procNode)) {
onEvent(procNode);
}
} else {
final ProcessSessionFactory sessionFactory = new StandardProcessSessionFactory(context);
final ConnectableProcessContext connectableProcessContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
trigger(connectable, scheduleState, connectableProcessContext, sessionFactory);
// See explanation above for the ProcessorNode as to why we do this.
if (Connectables.flowFilesQueued(connectable)) {
onEvent(connectable);
}
} finally {
activeThreadCount.decrementAndGet();
}
}
}