diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java index f442b80eb5..6fac03ba75 100644 --- a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java @@ -24,14 +24,30 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** + *

* Marker annotation a {@link org.apache.nifi.processor.Processor Processor} * implementation can use to indicate that the Processor should still be - * triggered even when it has no data in its work queue. By default, Processors - * which have no non-self incoming edges will be triggered even if there is no - * work in its queue. However, Processors that have non-self incoming edges will - * only be triggered if they have work in their queue or they present this - * annotation. + * triggered even when it has no data in its work queue. + *

* + *

+ * A Processor is scheduled to be triggered based on its configured Scheduling Period + * and Scheduling Strategy. However, when the scheduling period elapses, the Processor + * will not be scheduled if it has no work to do. Normally, a Processor is said to have + * work to do if one of the following circumstances is true: + *

+ * + * + * + *

+ * If the Processor needs to be triggered to run even when the above conditions are all + * false, the Processor's class can be annotated with this annotation, which + * will cause the Processor to be triggered, even if its incoming queues are empty. + *

*/ @Documented @Target({ElementType.TYPE}) diff --git a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java index bb69c65cea..7488b2d9a2 100644 --- a/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java +++ b/nifi-api/src/main/java/org/apache/nifi/processor/ProcessContext.java @@ -132,10 +132,18 @@ public interface ProcessContext { */ boolean hasIncomingConnection(); + /** + * @return true if the processor has one or more incoming connections for + * which the source of the connection is NOT the processor; returns false if + * the processor has no incoming connections or if all incoming connections are self-loops + * (i.e., the processor is also the source of all incoming connections). + */ + boolean hasNonLoopConnection(); + /** * @param relationship a relationship to check for connections * @return true if the relationship has one or more outbound connections, - * false otherwise + * false otherwise */ boolean hasConnection(Relationship relationship); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index a350ecbe10..49021d17af 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -48,7 +48,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean yieldCalled = false; private boolean enableExpressionValidation = false; private boolean allowExpressionValidation = true; - private boolean incomingConnection = true; + private volatile boolean incomingConnection = true; + private volatile boolean nonLoopConnection = true; private volatile Set connections = new HashSet<>(); private volatile Set unavailableRelationships = new HashSet<>(); @@ -305,6 +306,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S return this.connections.contains(relationship); } + public void setNonLoopConnection(final boolean hasNonLoopConnection) { + this.nonLoopConnection = hasNonLoopConnection; + } + + @Override + public boolean hasNonLoopConnection() { + return nonLoopConnection; + } + public void addConnection(final Relationship relationship) { this.connections.add(relationship); } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 0d00cc8590..2f384ba4d7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -523,6 +523,11 @@ public class StandardProcessorTestRunner implements TestRunner { context.setIncomingConnection(hasIncomingConnection); } + @Override + public void setNonLoopConnection(final boolean hasNonLoopConnection) { + context.setNonLoopConnection(hasNonLoopConnection); + } + @Override public void addConnection(Relationship relationship) { context.addConnection(relationship); diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index ec901fea03..b1e7c8c7a7 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -492,10 +492,18 @@ public interface TestRunner { * Indicates to the framework that the configured processor has one or more * incoming connections. * - * @param hasIncomingConnection whether or not the configured processor has an incoming connection + * @param hasIncomingConnection whether or not the configured processor should behave as though it has an incoming connection */ void setIncomingConnection(boolean hasIncomingConnection); + /** + * Indicates to the framework that the configured processor has one or more incoming + * connections for which the processor is not also the source. + * + * @param hasNonLoopConnection whether or not the configured processor should behave as though it has a non-looping incoming connection + */ + void setNonLoopConnection(boolean hasNonLoopConnection); + /** * Indicates to the Framework that the configured processor has a connection for the given Relationship. * diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java index 3238d4ac0a..38c9fc9476 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessContext.java @@ -88,6 +88,11 @@ public class MockProcessContext implements ProcessContext { return true; } + @Override + public boolean hasNonLoopConnection() { + return true; + } + @Override public boolean hasConnection(Relationship relationship) { return false; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java index da7162e91f..08e25040b4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/ConnectableProcessContext.java @@ -37,6 +37,7 @@ import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.Connectables; /** * This class is essentially an empty shell for {@link Connectable}s that are not Processors @@ -196,6 +197,11 @@ public class ConnectableProcessContext implements ProcessContext { return connectable.hasIncomingConnection(); } + @Override + public boolean hasNonLoopConnection() { + return Connectables.hasNonLoopConnection(connectable); + } + @Override public boolean hasConnection(Relationship relationship) { Set connections = connectable.getConnections(relationship); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index d4d595b8b1..dd12824095 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -72,30 +72,45 @@ public class ContinuallyRunProcessorTask implements Callable { this.processContext = processContext; } + static boolean isRunOnCluster(final ProcessorNode procNode, final boolean isClustered, final boolean isPrimary) { + return !procNode.isIsolated() || !isClustered || isPrimary; + } + + static boolean isYielded(final ProcessorNode procNode) { + return procNode.getYieldExpiration() >= System.currentTimeMillis(); + } + + static boolean isWorkToDo(final ProcessorNode procNode) { + return procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || !Connectables.hasNonLoopConnection(procNode) || Connectables.flowFilesQueued(procNode); + } + @Override @SuppressWarnings("deprecation") public Boolean call() { // make sure processor is not yielded - boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis()); - if (!shouldRun) { + if (isYielded(procNode)) { return false; } // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node - shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary(); - if (!shouldRun) { + if (!isRunOnCluster(procNode, flowController.isClustered(), flowController.isPrimary())) { return false; } - // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty - shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); - if (!shouldRun) { + // Make sure processor has work to do. This means that it meets one of these criteria: + // * It is annotated with @TriggerWhenEmpty + // * It has data in an incoming Connection + // * It has no incoming connections + // * All incoming connections are self-loops + if (!isWorkToDo(procNode)) { return true; } if (numRelationships > 0) { final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; - shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships); + if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) { + return false; + } } final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS); @@ -112,10 +127,6 @@ public class ContinuallyRunProcessorTask implements Callable { batch = false; } - if (!shouldRun) { - return false; - } - scheduleState.incrementActiveThreadCount(); final long startNanos = System.nanoTime(); @@ -123,6 +134,7 @@ public class ContinuallyRunProcessorTask implements Callable { int invocationCount = 0; try { try (final AutoCloseable ncl = NarCloseable.withNarLoader()) { + boolean shouldRun = true; while (shouldRun) { procNode.onTrigger(processContext, sessionFactory); invocationCount++; @@ -135,10 +147,14 @@ public class ContinuallyRunProcessorTask implements Callable { return false; } - shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode); - shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis()); + if (!isWorkToDo(procNode)) { + break; + } + if (isYielded(procNode)) { + break; + } - if (shouldRun && numRelationships > 0) { + if (numRelationships > 0) { final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships; shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java index 76849bdae3..a6302ca8fd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessContext.java @@ -32,6 +32,7 @@ import org.apache.nifi.controller.ControllerServiceLookup; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.encrypt.StringEncryptor; +import org.apache.nifi.util.Connectables; public class StandardProcessContext implements ProcessContext, ControllerServiceLookup { @@ -184,6 +185,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService return procNode.hasIncomingConnection(); } + @Override + public boolean hasNonLoopConnection() { + return Connectables.hasNonLoopConnection(procNode); + } + @Override public boolean hasConnection(Relationship relationship) { Set connections = procNode.getConnections(relationship); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java index 27d1264516..a3c2a5debd 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java @@ -117,6 +117,11 @@ public class StandardSchedulingContext implements SchedulingContext { return processContext.hasIncomingConnection(); } + @Override + public boolean hasNonLoopConnection() { + return processContext.hasNonLoopConnection(); + } + @Override public boolean hasConnection(Relationship relationship) { return processContext.hasConnection(relationship); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java index c4d040b1a1..5d74ebb481 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java @@ -17,6 +17,7 @@ package org.apache.nifi.util; import java.util.Collection; +import java.util.List; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Connection; @@ -53,4 +54,15 @@ public class Connectables { return false; } + + public static boolean hasNonLoopConnection(final Connectable connectable) { + final List connections = connectable.getIncomingConnections(); + for (final Connection connection : connections) { + if (!connection.getSource().equals(connectable)) { + return true; + } + } + + return false; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java new file mode 100644 index 0000000000..174e5fbc11 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/tasks/TestContinuallyRunProcessorTask.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.tasks; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.ProcessorNode; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestContinuallyRunProcessorTask { + + @Test + public void testIsWorkToDo() { + System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties"); + + final ProcessorNode procNode = Mockito.mock(ProcessorNode.class); + Mockito.when(procNode.hasIncomingConnection()).thenReturn(false); + + // There is work to do because there are no incoming connections. + assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode)); + + // Test with only a single connection that is self-looping and empty + final Connection selfLoopingConnection = Mockito.mock(Connection.class); + when(selfLoopingConnection.getSource()).thenReturn(procNode); + when(selfLoopingConnection.getDestination()).thenReturn(procNode); + + when(procNode.hasIncomingConnection()).thenReturn(true); + when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection)); + assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode)); + + // Test with only a single connection that is self-looping and empty + final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true); + + final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class); + when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false); + + when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue); + assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode)); + + + // Test with only a non-looping Connection that has no FlowFiles + final Connection emptyConnection = Mockito.mock(Connection.class); + when(emptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class)); + when(emptyConnection.getDestination()).thenReturn(procNode); + + when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue); + when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection)); + assertFalse(ContinuallyRunProcessorTask.isWorkToDo(procNode)); + + // test when the queue has data + final Connection nonEmptyConnection = Mockito.mock(Connection.class); + when(nonEmptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class)); + when(nonEmptyConnection.getDestination()).thenReturn(procNode); + when(nonEmptyConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue); + when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(nonEmptyConnection)); + assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode)); + } + +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java index 452df4298b..9aa9d598c7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java @@ -129,7 +129,11 @@ public class ExecuteSQL extends AbstractProcessor { FlowFile incoming = null; if (context.hasIncomingConnection()) { incoming = session.get(); - if (incoming == null) { + + // If we have no FlowFile, and all incoming connections are self-loops then we can continue on. + // However, if we have no FlowFile and we have connections coming from other Processors, then + // we know that we should run only if we have a FlowFile. + if (incoming == null && context.hasNonLoopConnection()) { return; } }