From fb65cf1235d7a1176c58244bc4307f72de0e2a5d Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 8 Dec 2015 07:53:36 -0500 Subject: [PATCH] NIFI-1271: Yield funnels and ports for nifi.bored.yield.duration amount of time if backpressure is applied, as we do when there are no input FlowFiles. Adjusting logic for ContinuallyRunProcessorTask#call in determining if there is appropriate availability for processor relationships. Signed-off-by: Aldrin Piri --- .../controller/tasks/ContinuallyRunConnectableTask.java | 9 +++++---- .../controller/tasks/ContinuallyRunProcessorTask.java | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index 0380e6eab6..04e3f6057f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -68,10 +68,11 @@ public class ContinuallyRunConnectableTask implements Callable { // 4. There is a connection for each relationship. final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty(); boolean flowFilesQueued = true; + boolean relationshipAvailable = true; final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis()) && (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable))) && (connectable.getConnectableType() != ConnectableType.FUNNEL || !connectable.getConnections().isEmpty()) - && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable)); + && (connectable.getRelationships().isEmpty() || (relationshipAvailable = Connectables.anyRelationshipAvailable(connectable))); if (shouldRun) { scheduleState.incrementActiveThreadCount(); @@ -100,9 +101,9 @@ public class ContinuallyRunConnectableTask implements Callable { scheduleState.decrementActiveThreadCount(); } - } else if (!flowFilesQueued) { - // FlowFiles must be queued in order to run but there are none queued; - // yield for just a bit. + } else if (!flowFilesQueued || !relationshipAvailable) { + // Either there are no FlowFiles queued, or the relationship is not available (i.e., backpressure is applied). + // We will yield for just a bit. return true; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index dd12824095..de91a6df47 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -109,7 +109,7 @@ public class ContinuallyRunProcessorTask implements Callable { if (numRelationships > 0) { final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) { - return false; + return true; } }