NIFI-1271: Yield funnels and ports for nifi.bored.yield.duration amount of time if backpressure is applied, as we do when there are no input FlowFiles. Adjusting logic for ContinuallyRunProcessorTask#call in determining if there is appropriate availability for processor relationships.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Mark Payne 2015-12-08 07:53:36 -05:00 committed by Aldrin Piri
parent ba2719836b
commit fb65cf1235
2 changed files with 6 additions and 5 deletions

View File

@ -68,10 +68,11 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
// 4. There is a connection for each relationship.
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
boolean flowFilesQueued = true;
boolean relationshipAvailable = true;
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
&& (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable)))
&& (connectable.getConnectableType() != ConnectableType.FUNNEL || !connectable.getConnections().isEmpty())
&& (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
&& (connectable.getRelationships().isEmpty() || (relationshipAvailable = Connectables.anyRelationshipAvailable(connectable)));
if (shouldRun) {
scheduleState.incrementActiveThreadCount();
@ -100,9 +101,9 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
scheduleState.decrementActiveThreadCount();
}
} else if (!flowFilesQueued) {
// FlowFiles must be queued in order to run but there are none queued;
// yield for just a bit.
} else if (!flowFilesQueued || !relationshipAvailable) {
// Either there are no FlowFiles queued, or the relationship is not available (i.e., backpressure is applied).
// We will yield for just a bit.
return true;
}

View File

@ -109,7 +109,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
return false;
return true;
}
}