NIFI-6157: Allowing configuration of concurrent tasks and transferred FlowFile count to Funnels and LocalPorts

This closes #3412
Signed-off-by: Brandon <devriesb@apache.org>
This commit is contained in:
Brandon Devries 2019-04-04 11:16:37 -04:00 committed by Brandon
parent bfcc0ebd03
commit cbc07eb663
5 changed files with 199 additions and 17 deletions

View File

@ -37,6 +37,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import java.util.ArrayList;
import java.util.Collection;
@ -57,11 +58,15 @@ import static java.util.Objects.requireNonNull;
public class StandardFunnel implements Funnel {
public static final long MINIMUM_PENALIZATION_MILLIS = 0L;
public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS;
public static final long MINIMUM_YIELD_MILLIS = 0L;
public static final long DEFAULT_YIELD_PERIOD = 1000L;
public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS;
// "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure
// the number of concurrent tasks to schedule for local ports and funnels.
static final String MAX_CONCURRENT_TASKS_PROP_NAME = "_nifi.funnel.max.concurrent.tasks";
// "_nifi.funnel.max.transferred.flowfiles" is an experimental NiFi property allowing users to configure
// the maximum number of FlowFiles transferred each time a funnel or local port runs (rounded up to the nearest 1000).
static final String MAX_TRANSFERRED_FLOWFILES_PROP_NAME = "_nifi.funnel.max.transferred.flowfiles";
private final String identifier;
private final Set<Connection> outgoingConnections;
@ -84,9 +89,12 @@ public class StandardFunnel implements Funnel {
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) {
final int maxIterations;
private final int maxConcurrentTasks;
public StandardFunnel(final String identifier, final NiFiProperties nifiProperties) {
this.identifier = identifier;
this.processGroupRef = new AtomicReference<>(processGroup);
this.processGroupRef = new AtomicReference<>();
outgoingConnections = new HashSet<>();
incomingConnections = new ArrayList<>();
@ -104,6 +112,10 @@ public class StandardFunnel implements Funnel {
schedulingPeriod = new AtomicReference<>("0 millis");
schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS);
name = new AtomicReference<>("Funnel");
maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1"));
int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000"));
maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0));
}
@Override
@ -381,9 +393,9 @@ public class StandardFunnel implements Funnel {
session.commit();
// If there are fewer than 1,000 FlowFiles available to transfer, or if we
// have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from
// have hit the configured FlowFile cap, we want to stop. This prevents us from
// holding the Timer-Driven Thread for an excessive amount of time.
if (flowFiles.size() < 1000 || ++iterations >= 10) {
if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
break;
}
@ -403,7 +415,7 @@ public class StandardFunnel implements Funnel {
@Override
public int getMaxConcurrentTasks() {
return 1;
return maxConcurrentTasks;
}
@Override

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Test;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
public class TestStandardFunnel {
@Test
public void testDefaultValues() {
StandardFunnel funnel = getStandardFunnel("", "");
assertEquals(1, funnel.getMaxConcurrentTasks());
assertEquals(10, funnel.maxIterations);
}
@Test
public void testSetConcurrentTasks() {
StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_CONCURRENT_TASKS_PROP_NAME, "2");
assertEquals(2, funnel.getMaxConcurrentTasks());
assertEquals(10, funnel.maxIterations);
}
@Test
public void testSetFlowFileLimit() {
{
StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000");
assertEquals(1, funnel.getMaxConcurrentTasks());
assertEquals(100, funnel.maxIterations);
}
{
StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001");
assertEquals(1, funnel.getMaxConcurrentTasks());
assertEquals(101, funnel.maxIterations);
}
{
StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999");
assertEquals(1, funnel.getMaxConcurrentTasks());
assertEquals(100, funnel.maxIterations);
}
{
StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0");
assertEquals(1, funnel.getMaxConcurrentTasks());
assertEquals(1, funnel.maxIterations);
}
{
StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1");
assertEquals(1, funnel.getMaxConcurrentTasks());
assertEquals(1, funnel.maxIterations);
}
}
private StandardFunnel getStandardFunnel(String name, String value) {
HashMap<String, String> additionalProperties = new HashMap<>();
additionalProperties.put(name, value);
NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, additionalProperties);
return new StandardFunnel("1", niFiProperties);
}
}

View File

@ -20,11 +20,11 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import java.util.ArrayList;
import java.util.Collection;
@ -40,12 +40,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*/
public class LocalPort extends AbstractPort {
// "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure
// the number of concurrent tasks to schedule for local ports and funnels.
static final String MAX_CONCURRENT_TASKS_PROP_NAME = "_nifi.funnel.max.concurrent.tasks";
// "_nifi.funnel.max.transferred.flowfiles" is an experimental NiFi property allowing users to configure
// the maximum number of FlowFiles transferred each time a funnel or local port runs (rounded up to the nearest 1000).
static final String MAX_TRANSFERRED_FLOWFILES_PROP_NAME = "_nifi.funnel.max.transferred.flowfiles";
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
final int maxIterations;
public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) {
super(id, name, processGroup, type, scheduler);
public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) {
super(id, name, null, type, scheduler);
int maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1"));
setMaxConcurrentTasks(maxConcurrentTasks);
int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000"));
maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0));
}
@Override
@ -91,9 +106,9 @@ public class LocalPort extends AbstractPort {
session.commit();
// If there are fewer than 1,000 FlowFiles available to transfer, or if we
// have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from
// have hit the configured FlowFile cap, we want to stop. This prevents us from
// holding the Timer-Driven Thread for an excessive amount of time.
if (flowFiles.size() < 1000 || ++iterations >= 10) {
if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
break;
}

View File

@ -186,21 +186,21 @@ public class StandardFlowManager implements FlowManager {
}
public Funnel createFunnel(final String id) {
return new StandardFunnel(id.intern(), null, processScheduler);
return new StandardFunnel(id.intern(), nifiProperties);
}
public Port createLocalInputPort(String id, String name) {
id = requireNonNull(id).intern();
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler);
return new LocalPort(id, name, ConnectableType.INPUT_PORT, processScheduler, nifiProperties);
}
public Port createLocalOutputPort(String id, String name) {
id = requireNonNull(id).intern();
name = requireNonNull(name).intern();
verifyPortIdDoesNotExist(id);
return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler);
return new LocalPort(id, name, ConnectableType.OUTPUT_PORT, processScheduler, nifiProperties);
}
public ProcessGroup createProcessGroup(final String id) {

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.connectable;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Test;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
public class TestLocalPort {
@Test
public void testDefaultValues() {
LocalPort port = getLocalPort("", "");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(10, port.maxIterations);
}
@Test
public void testSetConcurrentTasks() {
LocalPort port = getLocalPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2");
assertEquals(2, port.getMaxConcurrentTasks());
assertEquals(10, port.maxIterations);
}
@Test
public void testSetFlowFileLimit() {
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(100, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(101, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(100, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(1, port.maxIterations);
}
{
LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(1, port.maxIterations);
}
}
private LocalPort getLocalPort(String name, String value) {
HashMap<String, String> additionalProperties = new HashMap<>();
additionalProperties.put(name, value);
NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, additionalProperties);
return new LocalPort("1", "test", ConnectableType.INPUT_PORT, null, niFiProperties);
}
}