NIFI-362: Avoid continually scheduling components to run if there is no work for them to do or if they are yielded

This commit is contained in:
Mark Payne 2015-02-22 10:53:24 -05:00
parent dde5fd51a4
commit 4cc106a54d
6 changed files with 163 additions and 47 deletions

View File

@ -94,7 +94,7 @@ public class StandardFunnel implements Funnel {
position = new AtomicReference<>(new Position(0D, 0D)); position = new AtomicReference<>(new Position(0D, 0D));
scheduledState = new AtomicReference<>(ScheduledState.STOPPED); scheduledState = new AtomicReference<>(ScheduledState.STOPPED);
penalizationPeriod = new AtomicReference<>("30 sec"); penalizationPeriod = new AtomicReference<>("30 sec");
yieldPeriod = new AtomicReference<>("1 sec"); yieldPeriod = new AtomicReference<>("250 millis");
yieldExpiration = new AtomicLong(0L); yieldExpiration = new AtomicLong(0L);
schedulingPeriod = new AtomicReference<>("0 millis"); schedulingPeriod = new AtomicReference<>("0 millis");
schedulingNanos = new AtomicLong(30000); schedulingNanos = new AtomicLong(30000);

View File

@ -21,6 +21,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -34,8 +35,9 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper; import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.quartz.CronExpression; import org.quartz.CronExpression;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -130,13 +132,16 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
final List<AtomicBoolean> triggers = new ArrayList<>(); final List<AtomicBoolean> triggers = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final Runnable continuallyRunTask; final Callable<Boolean> continuallyRunTask;
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) { if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable; final ProcessorNode procNode = (ProcessorNode) connectable;
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor);
final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask; continuallyRunTask = runnableTask;
} else { } else {
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor); final ConnectableProcessContext connProcContext = new ConnectableProcessContext(connectable, encryptor);
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, connProcContext);
} }
final AtomicBoolean canceled = new AtomicBoolean(false); final AtomicBoolean canceled = new AtomicBoolean(false);
@ -147,7 +152,13 @@ public class QuartzSchedulingAgent implements SchedulingAgent {
return; return;
} }
continuallyRunTask.run(); try {
continuallyRunTask.call();
} catch (final RuntimeException re) {
throw re;
} catch (final Exception e) {
throw new ProcessException(e);
}
if (canceled.get()) { if (canceled.get()) {
return; return;

View File

@ -16,9 +16,10 @@
*/ */
package org.apache.nifi.controller.scheduling; package org.apache.nifi.controller.scheduling;
import java.util.ArrayList; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -27,7 +28,7 @@ public class ScheduleState {
private final AtomicInteger activeThreadCount = new AtomicInteger(0); private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final AtomicBoolean scheduled = new AtomicBoolean(false); private final AtomicBoolean scheduled = new AtomicBoolean(false);
private final List<ScheduledFuture<?>> futures = new ArrayList<>(); private final Set<ScheduledFuture<?>> futures = new HashSet<ScheduledFuture<?>>();
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1; private volatile long lastStopTime = -1;
@ -79,12 +80,17 @@ public class ScheduleState {
* *
* @param newFutures * @param newFutures
*/ */
public void setFutures(final List<ScheduledFuture<?>> newFutures) { public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
futures.clear(); futures.clear();
futures.addAll(newFutures); futures.addAll(newFutures);
} }
public List<ScheduledFuture<?>> getFutures() { public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final ScheduledFuture<?> newFuture) {
return Collections.unmodifiableList(futures); futures.remove(oldFuture);
futures.add(newFuture);
}
public synchronized Set<ScheduledFuture<?>> getFutures() {
return Collections.unmodifiableSet(futures);
} }
} }

View File

@ -18,8 +18,10 @@ package org.apache.nifi.controller.scheduling;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.ConnectableType;
@ -31,14 +33,16 @@ import org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper; import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine; import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.StandardProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class TimerDrivenSchedulingAgent implements SchedulingAgent { public class TimerDrivenSchedulingAgent implements SchedulingAgent {
private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class); private static final Logger logger = LoggerFactory.getLogger(TimerDrivenSchedulingAgent.class);
private static final long NO_WORK_YIELD_NANOS = TimeUnit.MILLISECONDS.toNanos(10L);
private final FlowController flowController; private final FlowController flowController;
private final FlowEngine flowEngine; private final FlowEngine flowEngine;
@ -72,20 +76,95 @@ public class TimerDrivenSchedulingAgent implements SchedulingAgent {
logger.info("{} started.", taskNode.getReportingTask()); logger.info("{} started.", taskNode.getReportingTask());
} }
@Override @Override
public void schedule(final Connectable connectable, final ScheduleState scheduleState) { public void schedule(final Connectable connectable, final ScheduleState scheduleState) {
final Runnable runnable;
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController, contextFactory, scheduleState, encryptor);
runnable = runnableTask;
} else {
runnable = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, encryptor);
}
final List<ScheduledFuture<?>> futures = new ArrayList<>(); final List<ScheduledFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(runnable, 0L, connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); final Callable<Boolean> continuallyRunTask;
final ProcessContext processContext;
// Determine the task to run and create it.
if (connectable.getConnectableType() == ConnectableType.PROCESSOR) {
final ProcessorNode procNode = (ProcessorNode) connectable;
final StandardProcessContext standardProcContext = new StandardProcessContext(procNode, flowController, encryptor);
final ContinuallyRunProcessorTask runnableTask = new ContinuallyRunProcessorTask(this, procNode, flowController,
contextFactory, scheduleState, standardProcContext);
continuallyRunTask = runnableTask;
processContext = standardProcContext;
} else {
processContext = new ConnectableProcessContext(connectable, encryptor);
continuallyRunTask = new ContinuallyRunConnectableTask(contextFactory, connectable, scheduleState, processContext);
}
final AtomicReference<ScheduledFuture<?>> futureRef = new AtomicReference<>();
final Runnable yieldDetectionRunnable = new Runnable() {
@Override
public void run() {
// Call the continually run task. It will return a boolean indicating whether or not we should yield
// based on a lack of work for to do for the component.
final boolean shouldYield;
try {
shouldYield = continuallyRunTask.call();
} catch (final RuntimeException re) {
throw re;
} catch (final Exception e) {
throw new ProcessException(e);
}
// If the component is yielded, cancel its future and re-submit it to run again
// after the yield has expired.
final long newYieldExpiration = connectable.getYieldExpiration();
if ( newYieldExpiration > System.currentTimeMillis() ) {
final long yieldMillis = System.currentTimeMillis() - newYieldExpiration;
final ScheduledFuture<?> scheduledFuture = futureRef.get();
if ( scheduledFuture == null ) {
return;
}
// If we are able to cancel the future, create a new one and update the ScheduleState so that it has
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
// so that we can do this again the next time that the component is yielded.
if (scheduledFuture.cancel(false)) {
final long yieldNanos = TimeUnit.MILLISECONDS.toNanos(yieldMillis);
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
scheduleState.replaceFuture(scheduledFuture, newFuture);
futureRef.set(newFuture);
}
} else if ( shouldYield ) {
// Component itself didn't yield but there was no work to do, so the framework will choose
// to yield the component automatically for a short period of time.
final ScheduledFuture<?> scheduledFuture = futureRef.get();
if ( scheduledFuture == null ) {
return;
}
// If we are able to cancel the future, create a new one and update the ScheduleState so that it has
// an accurate accounting of which futures are outstanding; we must then also update the futureRef
// so that we can do this again the next time that the component is yielded.
if (scheduledFuture.cancel(false)) {
final ScheduledFuture<?> newFuture = flowEngine.scheduleWithFixedDelay(this, NO_WORK_YIELD_NANOS,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
scheduleState.replaceFuture(scheduledFuture, newFuture);
futureRef.set(newFuture);
}
}
}
};
// Schedule the task to run
final ScheduledFuture<?> future = flowEngine.scheduleWithFixedDelay(yieldDetectionRunnable, 0L,
connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
// now that we have the future, set the atomic reference so that if the component is yielded we
// are able to then cancel this future.
futureRef.set(future);
// Keep track of the futures so that we can update the ScheduleState.
futures.add(future); futures.add(future);
} }

View File

@ -16,16 +16,16 @@
*/ */
package org.apache.nifi.controller.tasks; package org.apache.nifi.controller.tasks;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables; import org.apache.nifi.util.Connectables;
@ -33,28 +33,33 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ContinuallyRunConnectableTask implements Runnable { /**
* Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return
* <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise.
*/
public class ContinuallyRunConnectableTask implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class); private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class);
private final Connectable connectable; private final Connectable connectable;
private final ScheduleState scheduleState; private final ScheduleState scheduleState;
private final ProcessSessionFactory sessionFactory; private final ProcessSessionFactory sessionFactory;
private final ConnectableProcessContext processContext; private final ProcessContext processContext;
public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) { public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final ProcessContext processContext) {
this.connectable = connectable; this.connectable = connectable;
this.scheduleState = scheduleState; this.scheduleState = scheduleState;
this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L))); this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L)));
this.processContext = new ConnectableProcessContext(connectable, encryptor); this.processContext = processContext;
} }
@SuppressWarnings("deprecation")
@Override @Override
public void run() { @SuppressWarnings("deprecation")
public Boolean call() {
if (!scheduleState.isScheduled()) { if (!scheduleState.isScheduled()) {
return; return false;
} }
// Connectable should run if the following conditions are met: // Connectable should run if the following conditions are met:
// 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued // 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued
// 2. Any relationship is available (since there's only 1 // 2. Any relationship is available (since there's only 1
@ -62,8 +67,9 @@ public class ContinuallyRunConnectableTask implements Runnable {
// it means the same thing) // it means the same thing)
// 3. It is not yielded. // 3. It is not yielded.
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty(); final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
boolean flowFilesQueued = true;
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis()) final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
&& (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable)); && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
if (shouldRun) { if (shouldRun) {
scheduleState.incrementActiveThreadCount(); scheduleState.incrementActiveThreadCount();
@ -92,6 +98,12 @@ public class ContinuallyRunConnectableTask implements Runnable {
scheduleState.decrementActiveThreadCount(); scheduleState.decrementActiveThreadCount();
} }
} else if (!flowFilesQueued) {
// FlowFiles must be queued in order to run but there are none queued;
// yield for just a bit.
return true;
} }
return true;
} }
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.controller.tasks; package org.apache.nifi.controller.tasks;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -31,7 +32,6 @@ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.ProcessContextFactory; import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.controller.scheduling.ScheduleState;
import org.apache.nifi.controller.scheduling.SchedulingAgent; import org.apache.nifi.controller.scheduling.SchedulingAgent;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.ProcessSessionFactory;
@ -43,7 +43,12 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
public class ContinuallyRunProcessorTask implements Runnable {
/**
* Continually runs a processor as long as the processor has work to do. {@link #call()} will return
* <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
*/
public class ContinuallyRunProcessorTask implements Callable<Boolean> {
private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class); private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class);
@ -56,7 +61,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
private final int numRelationships; private final int numRelationships;
public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode, public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) { final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState,
final StandardProcessContext processContext) {
this.schedulingAgent = schedulingAgent; this.schedulingAgent = schedulingAgent;
this.procNode = procNode; this.procNode = procNode;
@ -65,28 +71,28 @@ public class ContinuallyRunProcessorTask implements Runnable {
this.flowController = flowController; this.flowController = flowController;
context = contextFactory.newProcessContext(procNode, new AtomicLong(0L)); context = contextFactory.newProcessContext(procNode, new AtomicLong(0L));
this.processContext = new StandardProcessContext(procNode, flowController, encryptor); this.processContext = processContext;
} }
@SuppressWarnings("deprecation")
@Override @Override
public void run() { @SuppressWarnings("deprecation")
public Boolean call() {
// make sure processor is not yielded // make sure processor is not yielded
boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis()); boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
if (!shouldRun) { if (!shouldRun) {
return; return false;
} }
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary(); shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
if (!shouldRun) { if (!shouldRun) {
return; return false;
} }
// make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
if (!shouldRun) { if (!shouldRun) {
return; return true;
} }
if (numRelationships > 0) { if (numRelationships > 0) {
@ -109,7 +115,7 @@ public class ContinuallyRunProcessorTask implements Runnable {
} }
if (!shouldRun) { if (!shouldRun) {
return; return false;
} }
scheduleState.incrementActiveThreadCount(); scheduleState.incrementActiveThreadCount();
@ -124,11 +130,11 @@ public class ContinuallyRunProcessorTask implements Runnable {
invocationCount++; invocationCount++;
if (!batch) { if (!batch) {
return; return false;
} }
if (System.nanoTime() > finishNanos) { if (System.nanoTime() > finishNanos) {
return; return false;
} }
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
@ -180,6 +186,8 @@ public class ContinuallyRunProcessorTask implements Runnable {
logger.error("", e); logger.error("", e);
} }
} }
return false;
} }
} }