NIFI-4064 Make sure that Funnels with queued incoming FlowFiles, but no outgoing connections yield rather than continually check to run.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1914.
This commit is contained in:
Wesley-Lawrence 2017-06-14 11:57:07 -04:00 committed by Pierre Villard
parent 2800df30d3
commit 5a7e6c6ac1
3 changed files with 101 additions and 3 deletions

View File

@ -67,10 +67,11 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
// 4. There is a connection for each relationship.
final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
boolean flowFilesQueued = true;
boolean funnelWithoutConnections = false;
boolean relationshipAvailable = true;
final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
&& (triggerWhenEmpty || (flowFilesQueued = Connectables.flowFilesQueued(connectable)))
&& (connectable.getConnectableType() != ConnectableType.FUNNEL || !connectable.getConnections().isEmpty())
&& (connectable.getConnectableType() != ConnectableType.FUNNEL || !(funnelWithoutConnections = connectable.getConnections().isEmpty()))
&& (connectable.getRelationships().isEmpty() || (relationshipAvailable = Connectables.anyRelationshipAvailable(connectable)));
if (shouldRun) {
@ -100,8 +101,8 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
scheduleState.decrementActiveThreadCount();
}
} else if (!flowFilesQueued || !relationshipAvailable) {
// Either there are no FlowFiles queued, or the relationship is not available (i.e., backpressure is applied).
} else if (!flowFilesQueued || funnelWithoutConnections || !relationshipAvailable) {
// Either there are no FlowFiles queued, it's a funnel without outgoing connections, or the relationship is not available (i.e., backpressure is applied).
// We will yield for just a bit.
return true;
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.scheduling;
public class DummyScheduleState extends ScheduleState {
public DummyScheduleState(boolean isScheduled) {
setScheduled(isScheduled);
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.tasks;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.scheduling.DummyScheduleState;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.Collections;
public class TestContinuallyRunConnectableTask {
@Test
public void funnelsShouldYieldWhenNoOutboundConnections() {
// Incoming connection with FlowFile
final FlowFileQueue flowFileQueueNonEmpty = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueueNonEmpty.isActiveQueueEmpty()).thenReturn(false);
final Connection connectionNonEmpty = Mockito.mock(Connection.class);
Mockito.when(connectionNonEmpty.getFlowFileQueue()).thenReturn(flowFileQueueNonEmpty);
// Create a Funnel with an inbound connection, and no outbound connections
final Funnel testFunnelNoOutbound = Mockito.mock(Funnel.class);
Mockito.when(testFunnelNoOutbound.getIncomingConnections()).thenReturn(Collections.singletonList(connectionNonEmpty));
Mockito.when(testFunnelNoOutbound.getConnections()).thenReturn(Collections.emptySet());
// Set the Funnel to be yielding up to 5 seconds ago
Mockito.when(testFunnelNoOutbound.getYieldExpiration()).thenReturn(System.currentTimeMillis() - 5000);
// Set the Funnel 'isTriggeredWhenEmpty' to false (same as what 'StandardFunnel' returns)
Mockito.when(testFunnelNoOutbound.isTriggerWhenEmpty()).thenReturn(false);
// Set the Funnel connection type
Mockito.when(testFunnelNoOutbound.getConnectableType()).thenReturn(ConnectableType.FUNNEL);
// Set the Funnel relationships to Anonymous (same as what 'StandardFunnel' returns)
Mockito.when(testFunnelNoOutbound.getRelationships()).thenReturn(Collections.singletonList(Relationship.ANONYMOUS));
// Create Mock 'ProcessContextFactory', and 'ProcessContext'
final ProcessContextFactory pcf = Mockito.mock(ProcessContextFactory.class);
Mockito.when(pcf.newProcessContext(Mockito.any(), Mockito.any())).thenReturn(null);
final ProcessContext pc = Mockito.mock(ProcessContext.class);
// Create ContinuallyRunConnectableTask
ContinuallyRunConnectableTask crct = new ContinuallyRunConnectableTask(pcf, testFunnelNoOutbound, new DummyScheduleState(true), pc);
// We should yield since this Funnel has no outbound connections.
Assert.assertTrue("Didn't yield when a Funnel has no outbound connections.", crct.call());
}
}