diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java index 231ec427a7..9084a6a335 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java @@ -37,6 +37,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; import java.util.ArrayList; import java.util.Collection; @@ -57,11 +58,15 @@ import static java.util.Objects.requireNonNull; public class StandardFunnel implements Funnel { - public static final long MINIMUM_PENALIZATION_MILLIS = 0L; public static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.MILLISECONDS; - public static final long MINIMUM_YIELD_MILLIS = 0L; - public static final long DEFAULT_YIELD_PERIOD = 1000L; - public static final TimeUnit DEFAULT_YIELD_TIME_UNIT = TimeUnit.MILLISECONDS; + + // "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure + // the number of concurrent tasks to schedule for local ports and funnels. + static final String MAX_CONCURRENT_TASKS_PROP_NAME = "_nifi.funnel.max.concurrent.tasks"; + + // "_nifi.funnel.max.transferred.flowfiles" is an experimental NiFi property allowing users to configure + // the maximum number of FlowFiles transferred each time a funnel or local port runs (rounded up to the nearest 1000). + static final String MAX_TRANSFERRED_FLOWFILES_PROP_NAME = "_nifi.funnel.max.transferred.flowfiles"; private final String identifier; private final Set outgoingConnections; @@ -84,9 +89,12 @@ public class StandardFunnel implements Funnel { private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - public StandardFunnel(final String identifier, final ProcessGroup processGroup, final ProcessScheduler scheduler) { + final int maxIterations; + private final int maxConcurrentTasks; + + public StandardFunnel(final String identifier, final NiFiProperties nifiProperties) { this.identifier = identifier; - this.processGroupRef = new AtomicReference<>(processGroup); + this.processGroupRef = new AtomicReference<>(); outgoingConnections = new HashSet<>(); incomingConnections = new ArrayList<>(); @@ -104,6 +112,10 @@ public class StandardFunnel implements Funnel { schedulingPeriod = new AtomicReference<>("0 millis"); schedulingNanos = new AtomicLong(MINIMUM_SCHEDULING_NANOS); name = new AtomicReference<>("Funnel"); + + maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1")); + int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000")); + maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0)); } @Override @@ -381,9 +393,9 @@ public class StandardFunnel implements Funnel { session.commit(); // If there are fewer than 1,000 FlowFiles available to transfer, or if we - // have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from + // have hit the configured FlowFile cap, we want to stop. This prevents us from // holding the Timer-Driven Thread for an excessive amount of time. - if (flowFiles.size() < 1000 || ++iterations >= 10) { + if (flowFiles.size() < 1000 || ++iterations >= maxIterations) { break; } @@ -403,7 +415,7 @@ public class StandardFunnel implements Funnel { @Override public int getMaxConcurrentTasks() { - return 1; + return maxConcurrentTasks; } @Override diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFunnel.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFunnel.java new file mode 100644 index 0000000000..5daa08b9cb --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFunnel.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller; + +import org.apache.nifi.util.NiFiProperties; +import org.junit.Test; + +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class TestStandardFunnel { + @Test + public void testDefaultValues() { + StandardFunnel funnel = getStandardFunnel("", ""); + assertEquals(1, funnel.getMaxConcurrentTasks()); + assertEquals(10, funnel.maxIterations); + } + + @Test + public void testSetConcurrentTasks() { + StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_CONCURRENT_TASKS_PROP_NAME, "2"); + assertEquals(2, funnel.getMaxConcurrentTasks()); + assertEquals(10, funnel.maxIterations); + } + + @Test + public void testSetFlowFileLimit() { + { + StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000"); + assertEquals(1, funnel.getMaxConcurrentTasks()); + assertEquals(100, funnel.maxIterations); + } + { + StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001"); + assertEquals(1, funnel.getMaxConcurrentTasks()); + assertEquals(101, funnel.maxIterations); + } + { + StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999"); + assertEquals(1, funnel.getMaxConcurrentTasks()); + assertEquals(100, funnel.maxIterations); + } + { + StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0"); + assertEquals(1, funnel.getMaxConcurrentTasks()); + assertEquals(1, funnel.maxIterations); + } + { + StandardFunnel funnel = getStandardFunnel(StandardFunnel.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1"); + assertEquals(1, funnel.getMaxConcurrentTasks()); + assertEquals(1, funnel.maxIterations); + } + } + + private StandardFunnel getStandardFunnel(String name, String value) { + HashMap additionalProperties = new HashMap<>(); + additionalProperties.put(name, value); + NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, additionalProperties); + return new StandardFunnel("1", niFiProperties); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index 5db8286126..724364d764 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -20,11 +20,11 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.NiFiProperties; import java.util.ArrayList; import java.util.Collection; @@ -40,12 +40,27 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; */ public class LocalPort extends AbstractPort { + // "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure + // the number of concurrent tasks to schedule for local ports and funnels. + static final String MAX_CONCURRENT_TASKS_PROP_NAME = "_nifi.funnel.max.concurrent.tasks"; + + // "_nifi.funnel.max.transferred.flowfiles" is an experimental NiFi property allowing users to configure + // the maximum number of FlowFiles transferred each time a funnel or local port runs (rounded up to the nearest 1000). + static final String MAX_TRANSFERRED_FLOWFILES_PROP_NAME = "_nifi.funnel.max.transferred.flowfiles"; + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); + final int maxIterations; - public LocalPort(final String id, final String name, final ProcessGroup processGroup, final ConnectableType type, final ProcessScheduler scheduler) { - super(id, name, processGroup, type, scheduler); + public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { + super(id, name, null, type, scheduler); + + int maxConcurrentTasks = Integer.parseInt(nifiProperties.getProperty(MAX_CONCURRENT_TASKS_PROP_NAME, "1")); + setMaxConcurrentTasks(maxConcurrentTasks); + + int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000")); + maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0)); } @Override @@ -91,9 +106,9 @@ public class LocalPort extends AbstractPort { session.commit(); // If there are fewer than 1,000 FlowFiles available to transfer, or if we - // have hit a cap of 10,000 FlowFiles, we want to stop. This prevents us from + // have hit the configured FlowFile cap, we want to stop. This prevents us from // holding the Timer-Driven Thread for an excessive amount of time. - if (flowFiles.size() < 1000 || ++iterations >= 10) { + if (flowFiles.size() < 1000 || ++iterations >= maxIterations) { break; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java index f100092ddd..e4e8f5e0e8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/flow/StandardFlowManager.java @@ -186,21 +186,21 @@ public class StandardFlowManager implements FlowManager { } public Funnel createFunnel(final String id) { - return new StandardFunnel(id.intern(), null, processScheduler); + return new StandardFunnel(id.intern(), nifiProperties); } public Port createLocalInputPort(String id, String name) { id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); - return new LocalPort(id, name, null, ConnectableType.INPUT_PORT, processScheduler); + return new LocalPort(id, name, ConnectableType.INPUT_PORT, processScheduler, nifiProperties); } public Port createLocalOutputPort(String id, String name) { id = requireNonNull(id).intern(); name = requireNonNull(name).intern(); verifyPortIdDoesNotExist(id); - return new LocalPort(id, name, null, ConnectableType.OUTPUT_PORT, processScheduler); + return new LocalPort(id, name, ConnectableType.OUTPUT_PORT, processScheduler, nifiProperties); } public ProcessGroup createProcessGroup(final String id) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java new file mode 100644 index 0000000000..ab8bb89a81 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.connectable; + +import org.apache.nifi.util.NiFiProperties; +import org.junit.Test; + +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; + +public class TestLocalPort { + + @Test + public void testDefaultValues() { + LocalPort port = getLocalPort("", ""); + assertEquals(1, port.getMaxConcurrentTasks()); + assertEquals(10, port.maxIterations); + } + + @Test + public void testSetConcurrentTasks() { + LocalPort port = getLocalPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2"); + assertEquals(2, port.getMaxConcurrentTasks()); + assertEquals(10, port.maxIterations); + } + + @Test + public void testSetFlowFileLimit() { + { + LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000"); + assertEquals(1, port.getMaxConcurrentTasks()); + assertEquals(100, port.maxIterations); + } + { + LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001"); + assertEquals(1, port.getMaxConcurrentTasks()); + assertEquals(101, port.maxIterations); + } + { + LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999"); + assertEquals(1, port.getMaxConcurrentTasks()); + assertEquals(100, port.maxIterations); + } + { + LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0"); + assertEquals(1, port.getMaxConcurrentTasks()); + assertEquals(1, port.maxIterations); + } + { + LocalPort port = getLocalPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1"); + assertEquals(1, port.getMaxConcurrentTasks()); + assertEquals(1, port.maxIterations); + } + } + + private LocalPort getLocalPort(String name, String value) { + HashMap additionalProperties = new HashMap<>(); + additionalProperties.put(name, value); + NiFiProperties niFiProperties = NiFiProperties.createBasicNiFiProperties(null, additionalProperties); + return new LocalPort("1", "test", ConnectableType.INPUT_PORT, null, niFiProperties); + } +} \ No newline at end of file