mirror of
https://github.com/apache/nifi.git
synced 2025-02-10 12:05:22 +00:00
NIFI-5075: Do not execute Funnels with no outgoing connections
- Added dedicated conditions for Funnels - Fixed stale Javadoc - Stopped caching hasNonLoopConnection variable - Grouped some conditions to isSourceComponent variable for better readability This closes #2634. Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
parent
2799211946
commit
8e4aa6bf22
@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.connectable.ConnectableType;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.ScheduledState;
|
||||
@ -51,7 +52,8 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Continually runs a processor as long as the processor has work to do. {@link #call()} will return <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
|
||||
* Continually runs a <code>{@link Connectable}</code> component as long as the component has work to do.
|
||||
* {@link #invoke()} ()} will return <code>{@link InvocationResult}</code> telling if the component should be yielded.
|
||||
*/
|
||||
public class ConnectableTask {
|
||||
|
||||
@ -64,7 +66,6 @@ public class ConnectableTask {
|
||||
private final ProcessContext processContext;
|
||||
private final FlowController flowController;
|
||||
private final int numRelationships;
|
||||
private final boolean hasNonLoopConnection;
|
||||
|
||||
|
||||
public ConnectableTask(final SchedulingAgent schedulingAgent, final Connectable connectable,
|
||||
@ -76,7 +77,6 @@ public class ConnectableTask {
|
||||
this.scheduleState = scheduleState;
|
||||
this.numRelationships = connectable.getRelationships().size();
|
||||
this.flowController = flowController;
|
||||
this.hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
|
||||
|
||||
final StateManager stateManager = new TaskTerminationAwareStateManager(flowController.getStateManagerProvider().getStateManager(connectable.getIdentifier()), scheduleState::isTerminated);
|
||||
if (connectable instanceof ProcessorNode) {
|
||||
@ -103,8 +103,40 @@ public class ConnectableTask {
|
||||
return connectable.getYieldExpiration() > System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure processor has work to do. This means that it meets one of these criteria:
|
||||
* <ol>
|
||||
* <li>It is a Funnel and has incoming FlowFiles from other components, and and at least one outgoing connection.</li>
|
||||
* <li>It is a 'source' component, meaning:<ul>
|
||||
* <li>It is annotated with @TriggerWhenEmpty</li>
|
||||
* <li>It has no incoming connections</li>
|
||||
* <li>All incoming connections are self-loops</li>
|
||||
* </ul></li>
|
||||
* <li>It has data in incoming connections to process</li>
|
||||
* </ol>
|
||||
* @return true if there is work to do, otherwise false
|
||||
*/
|
||||
private boolean isWorkToDo() {
|
||||
return connectable.isTriggerWhenEmpty() || !connectable.hasIncomingConnection() || !hasNonLoopConnection || Connectables.flowFilesQueued(connectable);
|
||||
boolean hasNonLoopConnection = Connectables.hasNonLoopConnection(connectable);
|
||||
|
||||
if (connectable.getConnectableType() == ConnectableType.FUNNEL) {
|
||||
// Handle Funnel as a special case because it will never be a 'source' component,
|
||||
// and also its outgoing connections can not be terminated.
|
||||
// Incoming FlowFiles from other components, and at least one outgoing connection are required.
|
||||
return connectable.hasIncomingConnection()
|
||||
&& hasNonLoopConnection
|
||||
&& !connectable.getConnections().isEmpty()
|
||||
&& Connectables.flowFilesQueued(connectable);
|
||||
}
|
||||
|
||||
final boolean isSourceComponent = connectable.isTriggerWhenEmpty()
|
||||
// No input connections
|
||||
|| !connectable.hasIncomingConnection()
|
||||
// Every incoming connection loops back to itself, no inputs from other components
|
||||
|| !hasNonLoopConnection;
|
||||
|
||||
// If it is not a 'source' component, it requires a FlowFile to process.
|
||||
return isSourceComponent || Connectables.flowFilesQueued(connectable);
|
||||
}
|
||||
|
||||
private boolean isBackPressureEngaged() {
|
||||
@ -129,11 +161,7 @@ public class ConnectableTask {
|
||||
return InvocationResult.DO_NOT_YIELD;
|
||||
}
|
||||
|
||||
// Make sure processor has work to do. This means that it meets one of these criteria:
|
||||
// * It is annotated with @TriggerWhenEmpty
|
||||
// * It has data in an incoming Connection
|
||||
// * It has no incoming connections
|
||||
// * All incoming connections are self-loops
|
||||
// Make sure processor has work to do.
|
||||
if (!isWorkToDo()) {
|
||||
return InvocationResult.yield("No work to do");
|
||||
}
|
||||
|
@ -21,12 +21,17 @@ import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.nifi.components.state.StateManagerProvider;
|
||||
import org.apache.nifi.connectable.Connectable;
|
||||
import org.apache.nifi.connectable.ConnectableType;
|
||||
import org.apache.nifi.connectable.Connection;
|
||||
import org.apache.nifi.connectable.Funnel;
|
||||
import org.apache.nifi.controller.FlowController;
|
||||
import org.apache.nifi.controller.ProcessorNode;
|
||||
import org.apache.nifi.controller.queue.FlowFileQueue;
|
||||
@ -41,15 +46,9 @@ import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestConnectableTask {
|
||||
@Test
|
||||
public void testIsWorkToDo() {
|
||||
final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
|
||||
Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
|
||||
|
||||
final Processor processor = Mockito.mock(Processor.class);
|
||||
Mockito.when(procNode.getIdentifier()).thenReturn("123");
|
||||
Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
|
||||
|
||||
private ConnectableTask createTask(final Connectable connectable) {
|
||||
final FlowController flowController = Mockito.mock(FlowController.class);
|
||||
Mockito.when(flowController.getStateManagerProvider()).thenReturn(Mockito.mock(StateManagerProvider.class));
|
||||
|
||||
@ -61,9 +60,22 @@ public class TestConnectableTask {
|
||||
|
||||
final LifecycleState scheduleState = new LifecycleState();
|
||||
final StringEncryptor encryptor = Mockito.mock(StringEncryptor.class);
|
||||
ConnectableTask task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
|
||||
|
||||
return new ConnectableTask(Mockito.mock(SchedulingAgent.class), connectable,
|
||||
flowController, contextFactory, scheduleState, encryptor);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsWorkToDo() {
|
||||
final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
|
||||
Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
|
||||
|
||||
final Processor processor = Mockito.mock(Processor.class);
|
||||
Mockito.when(procNode.getIdentifier()).thenReturn("123");
|
||||
Mockito.when(procNode.getRunnableComponent()).thenReturn(processor);
|
||||
|
||||
// There is work to do because there are no incoming connections.
|
||||
final ConnectableTask task = createTask(procNode);
|
||||
assertFalse(task.invoke().isYield());
|
||||
|
||||
// Test with only a single connection that is self-looping and empty
|
||||
@ -93,8 +105,6 @@ public class TestConnectableTask {
|
||||
when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
|
||||
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
|
||||
|
||||
// Create a new ConnectableTask because we want to have a different value for the 'hasNonLoopConnection' value, which is calculated in the task's constructor.
|
||||
task = new ConnectableTask(Mockito.mock(SchedulingAgent.class), procNode, flowController, contextFactory, scheduleState, encryptor);
|
||||
assertTrue(task.invoke().isYield());
|
||||
|
||||
// test when the queue has data
|
||||
@ -106,4 +116,67 @@ public class TestConnectableTask {
|
||||
assertFalse(task.invoke().isYield());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsWorkToDoFunnels() {
|
||||
final Funnel funnel = Mockito.mock(Funnel.class);
|
||||
Mockito.when(funnel.hasIncomingConnection()).thenReturn(false);
|
||||
Mockito.when(funnel.getRunnableComponent()).thenReturn(funnel);
|
||||
Mockito.when(funnel.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
|
||||
Mockito.when(funnel.getIdentifier()).thenReturn("funnel-1");
|
||||
|
||||
final ConnectableTask task = createTask(funnel);
|
||||
assertTrue("If there is no incoming connection, it should be yielded.", task.invoke().isYield());
|
||||
|
||||
// Test with only a single connection that is self-looping and empty.
|
||||
// Actually, this self-loop input can not be created for Funnels using NiFi API because an outer layer check condition does not allow it.
|
||||
// But test it anyways.
|
||||
final Connection selfLoopingConnection = Mockito.mock(Connection.class);
|
||||
when(selfLoopingConnection.getSource()).thenReturn(funnel);
|
||||
when(selfLoopingConnection.getDestination()).thenReturn(funnel);
|
||||
|
||||
when(funnel.hasIncomingConnection()).thenReturn(true);
|
||||
when(funnel.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
|
||||
|
||||
final FlowFileQueue emptyQueue = Mockito.mock(FlowFileQueue.class);
|
||||
when(emptyQueue.isActiveQueueEmpty()).thenReturn(true);
|
||||
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(emptyQueue);
|
||||
|
||||
final Set<Connection> outgoingConnections = new HashSet<>();
|
||||
outgoingConnections.add(selfLoopingConnection);
|
||||
when(funnel.getConnections()).thenReturn(outgoingConnections);
|
||||
|
||||
assertTrue("If there is no incoming connection from other components, it should be yielded.", task.invoke().isYield());
|
||||
|
||||
// Add an incoming connection from another component.
|
||||
final ProcessorNode inputProcessor = Mockito.mock(ProcessorNode.class);
|
||||
final Connection incomingFromAnotherComponent = Mockito.mock(Connection.class);
|
||||
when(incomingFromAnotherComponent.getSource()).thenReturn(inputProcessor);
|
||||
when(incomingFromAnotherComponent.getDestination()).thenReturn(funnel);
|
||||
when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(emptyQueue);
|
||||
|
||||
when(funnel.hasIncomingConnection()).thenReturn(true);
|
||||
when(funnel.getIncomingConnections()).thenReturn(Arrays.asList(selfLoopingConnection, incomingFromAnotherComponent));
|
||||
|
||||
assertTrue("Even if there is an incoming connection from another component," +
|
||||
" it should be yielded because there's no outgoing connections.", task.invoke().isYield());
|
||||
|
||||
// Add an outgoing connection to another component.
|
||||
final ProcessorNode outputProcessor = Mockito.mock(ProcessorNode.class);
|
||||
final Connection outgoingToAnotherComponent = Mockito.mock(Connection.class);
|
||||
when(outgoingToAnotherComponent.getSource()).thenReturn(funnel);
|
||||
when(outgoingToAnotherComponent.getDestination()).thenReturn(outputProcessor);
|
||||
outgoingConnections.add(outgoingToAnotherComponent);
|
||||
|
||||
assertTrue("Even if there is an incoming connection from another component and an outgoing connection as well," +
|
||||
" it should be yielded because there's no incoming FlowFiles to process.", task.invoke().isYield());
|
||||
|
||||
// Adding input FlowFiles.
|
||||
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
|
||||
when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
|
||||
when(incomingFromAnotherComponent.getFlowFileQueue()).thenReturn(nonEmptyQueue);
|
||||
assertFalse("When a Funnel has both incoming and outgoing connections and FlowFiles to process, then it should be executed.",
|
||||
task.invoke().isYield());
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user