NIFI-1168: Ensure that processors with only looping

connections are scheduled to run, even if the connections have no FlowFiles;
 expose these details to processor developers; update documentation

Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
Mark Payne 2015-11-18 14:53:30 -05:00 committed by Aldrin Piri
parent 773576e041
commit 69bce2c2db
13 changed files with 207 additions and 24 deletions

View File

@ -24,14 +24,30 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <p>
* Marker annotation a {@link org.apache.nifi.processor.Processor Processor}
* implementation can use to indicate that the Processor should still be
* triggered even when it has no data in its work queue. By default, Processors
* which have no non-self incoming edges will be triggered even if there is no
* work in its queue. However, Processors that have non-self incoming edges will
* only be triggered if they have work in their queue or they present this
* annotation.
* triggered even when it has no data in its work queue.
* </p>
*
* <p>
* A Processor is scheduled to be triggered based on its configured Scheduling Period
* and Scheduling Strategy. However, when the scheduling period elapses, the Processor
* will not be scheduled if it has no work to do. Normally, a Processor is said to have
* work to do if one of the following circumstances is true:
* </p>
*
* <ul>
* <li>An incoming Connection has data in its queue</li>
* <li>The Processor has no incoming Connections.</li>
* <li>All incoming Connections are self-loops (both the source and destination of the Connection are the same Processor).
* </ul>
*
* <p>
* If the Processor needs to be triggered to run even when the above conditions are all
* <code>false</code>, the Processor's class can be annotated with this annotation, which
* will cause the Processor to be triggered, even if its incoming queues are empty.
* </p>
*/
@Documented
@Target({ElementType.TYPE})

View File

@ -132,10 +132,18 @@ public interface ProcessContext {
*/
boolean hasIncomingConnection();
/**
* @return <code>true</code> if the processor has one or more incoming connections for
* which the source of the connection is NOT the processor; returns <code>false</code> if
* the processor has no incoming connections or if all incoming connections are self-loops
* (i.e., the processor is also the source of all incoming connections).
*/
boolean hasNonLoopConnection();
/**
* @param relationship a relationship to check for connections
* @return true if the relationship has one or more outbound connections,
* false otherwise
* false otherwise
*/
boolean hasConnection(Relationship relationship);

View File

@ -48,7 +48,8 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
private boolean yieldCalled = false;
private boolean enableExpressionValidation = false;
private boolean allowExpressionValidation = true;
private boolean incomingConnection = true;
private volatile boolean incomingConnection = true;
private volatile boolean nonLoopConnection = true;
private volatile Set<Relationship> connections = new HashSet<>();
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
@ -305,6 +306,15 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
return this.connections.contains(relationship);
}
public void setNonLoopConnection(final boolean hasNonLoopConnection) {
this.nonLoopConnection = hasNonLoopConnection;
}
@Override
public boolean hasNonLoopConnection() {
return nonLoopConnection;
}
public void addConnection(final Relationship relationship) {
this.connections.add(relationship);
}

View File

@ -523,6 +523,11 @@ public class StandardProcessorTestRunner implements TestRunner {
context.setIncomingConnection(hasIncomingConnection);
}
@Override
public void setNonLoopConnection(final boolean hasNonLoopConnection) {
context.setNonLoopConnection(hasNonLoopConnection);
}
@Override
public void addConnection(Relationship relationship) {
context.addConnection(relationship);

View File

@ -492,10 +492,18 @@ public interface TestRunner {
* Indicates to the framework that the configured processor has one or more
* incoming connections.
*
* @param hasIncomingConnection whether or not the configured processor has an incoming connection
* @param hasIncomingConnection whether or not the configured processor should behave as though it has an incoming connection
*/
void setIncomingConnection(boolean hasIncomingConnection);
/**
* Indicates to the framework that the configured processor has one or more incoming
* connections for which the processor is not also the source.
*
* @param hasNonLoopConnection whether or not the configured processor should behave as though it has a non-looping incoming connection
*/
void setNonLoopConnection(boolean hasNonLoopConnection);
/**
* Indicates to the Framework that the configured processor has a connection for the given Relationship.
*

View File

@ -88,6 +88,11 @@ public class MockProcessContext implements ProcessContext {
return true;
}
@Override
public boolean hasNonLoopConnection() {
return true;
}
@Override
public boolean hasConnection(Relationship relationship) {
return false;

View File

@ -37,6 +37,7 @@ import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.Connectables;
/**
* This class is essentially an empty shell for {@link Connectable}s that are not Processors
@ -196,6 +197,11 @@ public class ConnectableProcessContext implements ProcessContext {
return connectable.hasIncomingConnection();
}
@Override
public boolean hasNonLoopConnection() {
return Connectables.hasNonLoopConnection(connectable);
}
@Override
public boolean hasConnection(Relationship relationship) {
Set<Connection> connections = connectable.getConnections(relationship);

View File

@ -72,30 +72,45 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
this.processContext = processContext;
}
static boolean isRunOnCluster(final ProcessorNode procNode, final boolean isClustered, final boolean isPrimary) {
return !procNode.isIsolated() || !isClustered || isPrimary;
}
static boolean isYielded(final ProcessorNode procNode) {
return procNode.getYieldExpiration() >= System.currentTimeMillis();
}
static boolean isWorkToDo(final ProcessorNode procNode) {
return procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || !Connectables.hasNonLoopConnection(procNode) || Connectables.flowFilesQueued(procNode);
}
@Override
@SuppressWarnings("deprecation")
public Boolean call() {
// make sure processor is not yielded
boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
if (!shouldRun) {
if (isYielded(procNode)) {
return false;
}
// make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
if (!shouldRun) {
if (!isRunOnCluster(procNode, flowController.isClustered(), flowController.isPrimary())) {
return false;
}
// make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
if (!shouldRun) {
// Make sure processor has work to do. This means that it meets one of these criteria:
// * It is annotated with @TriggerWhenEmpty
// * It has data in an incoming Connection
// * It has no incoming connections
// * All incoming connections are self-loops
if (!isWorkToDo(procNode)) {
return true;
}
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
if (!context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {
return false;
}
}
final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
@ -112,10 +127,6 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
batch = false;
}
if (!shouldRun) {
return false;
}
scheduleState.incrementActiveThreadCount();
final long startNanos = System.nanoTime();
@ -123,6 +134,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
int invocationCount = 0;
try {
try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
boolean shouldRun = true;
while (shouldRun) {
procNode.onTrigger(processContext, sessionFactory);
invocationCount++;
@ -135,10 +147,14 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
return false;
}
shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis());
if (!isWorkToDo(procNode)) {
break;
}
if (isYielded(procNode)) {
break;
}
if (shouldRun && numRelationships > 0) {
if (numRelationships > 0) {
final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
}

View File

@ -32,6 +32,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.util.Connectables;
public class StandardProcessContext implements ProcessContext, ControllerServiceLookup {
@ -184,6 +185,11 @@ public class StandardProcessContext implements ProcessContext, ControllerService
return procNode.hasIncomingConnection();
}
@Override
public boolean hasNonLoopConnection() {
return Connectables.hasNonLoopConnection(procNode);
}
@Override
public boolean hasConnection(Relationship relationship) {
Set<Connection> connections = procNode.getConnections(relationship);

View File

@ -117,6 +117,11 @@ public class StandardSchedulingContext implements SchedulingContext {
return processContext.hasIncomingConnection();
}
@Override
public boolean hasNonLoopConnection() {
return processContext.hasNonLoopConnection();
}
@Override
public boolean hasConnection(Relationship relationship) {
return processContext.hasConnection(relationship);

View File

@ -17,6 +17,7 @@
package org.apache.nifi.util;
import java.util.Collection;
import java.util.List;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
@ -53,4 +54,15 @@ public class Connectables {
return false;
}
public static boolean hasNonLoopConnection(final Connectable connectable) {
final List<Connection> connections = connectable.getIncomingConnections();
for (final Connection connection : connections) {
if (!connection.getSource().equals(connectable)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.tasks;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
import java.util.Collections;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.junit.Test;
import org.mockito.Mockito;
public class TestContinuallyRunProcessorTask {
@Test
public void testIsWorkToDo() {
System.setProperty("nifi.properties.file.path", "src/test/resources/nifi.properties");
final ProcessorNode procNode = Mockito.mock(ProcessorNode.class);
Mockito.when(procNode.hasIncomingConnection()).thenReturn(false);
// There is work to do because there are no incoming connections.
assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
// Test with only a single connection that is self-looping and empty
final Connection selfLoopingConnection = Mockito.mock(Connection.class);
when(selfLoopingConnection.getSource()).thenReturn(procNode);
when(selfLoopingConnection.getDestination()).thenReturn(procNode);
when(procNode.hasIncomingConnection()).thenReturn(true);
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(selfLoopingConnection));
assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
// Test with only a single connection that is self-looping and empty
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
when(flowFileQueue.isActiveQueueEmpty()).thenReturn(true);
final FlowFileQueue nonEmptyQueue = Mockito.mock(FlowFileQueue.class);
when(nonEmptyQueue.isActiveQueueEmpty()).thenReturn(false);
when(selfLoopingConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
// Test with only a non-looping Connection that has no FlowFiles
final Connection emptyConnection = Mockito.mock(Connection.class);
when(emptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class));
when(emptyConnection.getDestination()).thenReturn(procNode);
when(emptyConnection.getFlowFileQueue()).thenReturn(flowFileQueue);
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(emptyConnection));
assertFalse(ContinuallyRunProcessorTask.isWorkToDo(procNode));
// test when the queue has data
final Connection nonEmptyConnection = Mockito.mock(Connection.class);
when(nonEmptyConnection.getSource()).thenReturn(Mockito.mock(ProcessorNode.class));
when(nonEmptyConnection.getDestination()).thenReturn(procNode);
when(nonEmptyConnection.getFlowFileQueue()).thenReturn(nonEmptyQueue);
when(procNode.getIncomingConnections()).thenReturn(Collections.singletonList(nonEmptyConnection));
assertTrue(ContinuallyRunProcessorTask.isWorkToDo(procNode));
}
}

View File

@ -129,7 +129,11 @@ public class ExecuteSQL extends AbstractProcessor {
FlowFile incoming = null;
if (context.hasIncomingConnection()) {
incoming = session.get();
if (incoming == null) {
// If we have no FlowFile, and all incoming connections are self-loops then we can continue on.
// However, if we have no FlowFile and we have connections coming from other Processors, then
// we know that we should run only if we have a FlowFile.
if (incoming == null && context.hasNonLoopConnection()) {
return;
}
}