diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
index b0ba446484..1a98ee3006 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ConnectableTask.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
@@ -51,7 +52,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Continually runs a processor as long as the processor has work to do. {@link #call()} will return true
if the processor should be yielded, false
otherwise.
+ * Continually runs a {@link Connectable}
component as long as the component has work to do.
+ * {@link #invoke()} ()} will return {@link InvocationResult}
telling if the component should be yielded.
*/
public class ConnectableTask {
@@ -64,7 +66,6 @@ public class ConnectableTask {
private final ProcessContext processContext;
private final FlowController flowController;
private final int numRelationships;
- private final boolean hasNonLoopConnection;
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
@@ -76,7 +77,6 @@ public class ConnectableTask {
this.scheduleState = scheduleState;
this.numRelationships = connectable.getRelationships().size();
this.flowController = flowController;
- this.hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
if (connectable instanceof ProcessorNode) {
@@ -103,8 +103,40 @@ public class ConnectableTask {
return connectable.getYieldExpiration() > System.currentTimeMillis();
}
+ /**
+ * Make sure processor has work to do. This means that it meets one of these criteria:
+ *
+ * - It is a Funnel and has incoming FlowFiles from other components, and and at least one outgoing connection.
+ * - It is a 'source' component, meaning:
+ * - It is annotated with @TriggerWhenEmpty
+ * - It has no incoming connections
+ * - All incoming connections are self-loops
+ *
+ * - It has data in incoming connections to process
+ *
+ * @return true if there is work to do, otherwise false
+ */
private boolean isWorkToDo() {
- return connectable.isTriggerWhenEmpty() || !connectable.hasIncomingConnection() || !hasNonLoopConnection || Connectables.flowFilesQueued(connectable);
+ boolean hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
+
+ if (connectable.getConnectableType() == ConnectableType.FUNNEL) {
+ // Handle Funnel as a special case because it will never be a 'source' component,
+ // and also its outgoing connections can not be terminated.
+ // Incoming FlowFiles from other components, and at least one outgoing connection are required.
+ return connectable.hasIncomingConnection()
+ && hasNonLoopConnection
+ && !connectable.getConnections().isEmpty()
+ && Connectables.flowFilesQueued(connectable);
+ }
+
+ final boolean isSourceComponent = connectable.isTriggerWhenEmpty()
+ // No input connections
+ || !connectable.hasIncomingConnection()
+ // Every incoming connection loops back to itself, no inputs from other components
+ || !hasNonLoopConnection;
+
+ // If it is not a 'source' component, it requires a FlowFile to process.
+ return isSourceComponent || Connectables.flowFilesQueued(connectable);
}
private boolean isBackPressureEngaged() {
@@ -129,11 +161,7 @@ public class ConnectableTask {
return InvocationResult.DO_NOT_YIELD;
}
- // Make sure processor has work to do. This means that it meets one of these criteria:
- // * It is annotated with @TriggerWhenEmpty
- // * It has data in an incoming Connection
- // * It has no incoming connections
- // * All incoming connections are self-loops
+ // Make sure processor has work to do.
if (!isWorkToDo()) {
return InvocationResult.yield("No work to do");
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
index 3ff95800c7..7214b8079e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestConnectableTask.java
@@ -21,12 +21,17 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -41,15 +46,9 @@ import org.junit.Test;
import org.mockito.Mockito;
public class TestConnectableTask {
- @Test
- public void testIsWorkToDo() {
- final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
- Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
- final Processor processor = Mockito.mock(Processor.class);
- Mockito.when(procNode.getIdentifier()).thenReturn("123");
- Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
+ private ConnectableTask createTask(final Connectable connectable) {
final FlowController flowController = Mockito.mock(FlowController.class);
Mockito.when(flowController.getStateManagerProvider()).thenReturn(Mockito.mock(StateManagerProvider.class));
@@ -61,9 +60,22 @@ public class TestConnectableTask {
final LifecycleState scheduleState = new LifecycleState();
final StringEncryptor encryptor = Mockito.mock(StringEncryptor.class);
- ConnectableTask task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
+
+ return new ConnectableTask(Mockito.mock(SchedulingAgent.class), connectable,
+ flowController, contextFactory, scheduleState, encryptor);
+ }
+
+ @Test
+ public void testIsWorkToDo() {
+ final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
+ Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
+
+ final Processor processor = Mockito.mock(Processor.class);
+ Mockito.when(procNode.getIdentifier()).thenReturn("123");
+ Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
// There is work to do because there are no incoming connections.
+ final ConnectableTask task = createTask(procNode);
assertFalse(task.invoke().isYield());
// Test with only a single connection that is self-looping and empty
@@ -93,8 +105,6 @@ public class TestConnectableTask {
when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
- // Create a new ConnectableTask because we want to have a different value for the 'hasNonLoopConnection' value, which is calculated in the task's constructor.
- task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
assertTrue(task.invoke().isYield());
// test when the queue has data
@@ -106,4 +116,67 @@ public class TestConnectableTask {
assertFalse(task.invoke().isYield());
}
+ @Test
+ public void testIsWorkToDoFunnels() {
+ final Funnel funnel = Mockito.mock(Funnel.class);
+ Mockito.when(funnel.hasIncomingConnection()).thenReturn(false);
+ Mockito.when(funnel.getRunnableComponent()).thenReturn(funnel);
+ Mockito.when(funnel.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
+ Mockito.when(funnel.getIdentifier()).thenReturn("funnel-1");
+
+ final ConnectableTask task = createTask(funnel);
+ assertTrue("If there is no incoming connection, it should be yielded.", task.invoke().isYield());
+
+ // Test with only a single connection that is self-looping and empty.
+ // Actually, this self-loop input can not be created for Funnels using NiFi API because an outer layer check condition does not allow it.
+ // But test it anyways.
+ final Connection selfLoopingConnection = Mockito.mock(Connection.class);
+ when(selfLoopingConnection.getSource()).thenReturn(funnel);
+ when(selfLoopingConnection.getDestination()).thenReturn(funnel);
+
+ when(funnel.hasIncomingConnection()).thenReturn(true);
+ when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
+
+ final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
+ when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
+ when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
+
+ final Set outgoingConnections = new HashSet<>();
+ outgoingConnections.add(selfLoopingConnection);
+ when(funnel.getConnections()).thenReturn(outgoingConnections);
+
+ assertTrue("If there is no incoming connection from other components, it should be yielded.", task.invoke().isYield());
+
+ // Add an incoming connection from another component.
+ final ProcessorNode inputProcessor = Mockito.mock(ProcessorNode.class);
+ final Connection incomingFromAnotherComponent = Mockito.mock(Connection.class);
+ when(incomingFromAnotherComponent.getSource()).thenReturn(inputProcessor);
+ when(incomingFromAnotherComponent.getDestination()).thenReturn(funnel);
+ when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(emptyQueue);
+
+ when(funnel.hasIncomingConnection()).thenReturn(true);
+ when(funnel.getIncomingConnections()).thenReturn(Arrays.asList(selfLoopingConnection, incomingFromAnotherComponent));
+
+ assertTrue("Even if there is an incoming connection from another component," +
+ " it should be yielded because there's no outgoing connections.", task.invoke().isYield());
+
+ // Add an outgoing connection to another component.
+ final ProcessorNode outputProcessor = Mockito.mock(ProcessorNode.class);
+ final Connection outgoingToAnotherComponent = Mockito.mock(Connection.class);
+ when(outgoingToAnotherComponent.getSource()).thenReturn(funnel);
+ when(outgoingToAnotherComponent.getDestination()).thenReturn(outputProcessor);
+ outgoingConnections.add(outgoingToAnotherComponent);
+
+ assertTrue("Even if there is an incoming connection from another component and an outgoing connection as well," +
+ " it should be yielded because there's no incoming FlowFiles to process.", task.invoke().isYield());
+
+ // Adding input FlowFiles.
+ final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
+ when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
+ when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
+ assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
+ task.invoke().isYield());
+
+ }
+
}