From 359fd3ff299c6abb7e7b4d5dfb99e48570aeede5 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 20 May 2020 17:09:40 -0400 Subject: [PATCH] NIFI-7476: Implemented FlowFileGating / FlowFileConcurrency at the ProcessGroup level Added FlowFileOutboundPolicy to ProcessGroups and updated LocalPort to make use of it Persisted FlowFile Concurrency and FlowFile Output Policy to flow.xml.gz and included in flow fingerprint Added configuration for FlowFile concurrency and outbound policy to UI for configuration of Process Groups Added system tests. Fixed a couple of bugs that were found Fixed a couple of typos in the RecordPath guide Signed-off-by: Pierre Villard This closes #4306. --- .../nifi/web/api/dto/ProcessGroupDTO.java | 21 +++ .../nifi/groups/FlowFileConcurrency.java | 39 +++++ .../org/apache/nifi/groups/FlowFileGate.java | 26 +++ .../nifi/groups/FlowFileOutboundPolicy.java | 35 ++++ .../org/apache/nifi/groups/ProcessGroup.java | 42 +++++ .../apache/nifi/connectable/LocalPort.java | 131 ++++++++++++--- .../nifi/controller/StandardFlowSnippet.java | 13 ++ .../controller/StandardFlowSynchronizer.java | 31 ++++ .../serialization/FlowFromDOMFactory.java | 2 + .../serialization/StandardFlowSerializer.java | 2 + .../nifi/fingerprint/FingerprintFactory.java | 2 + .../groups/SingleConcurrencyFlowFileGate.java | 57 +++++++ .../nifi/groups/StandardProcessGroup.java | 93 +++++++++++ .../nifi/groups/UnboundedFlowFileGate.java | 29 ++++ .../src/main/resources/FlowConfiguration.xsd | 22 ++- .../nifi/connectable/TestLocalPort.java | 14 +- .../service/mock/MockProcessGroup.java | 45 ++++++ .../apache/nifi/web/api/dto/DtoFactory.java | 4 + .../web/dao/impl/StandardProcessGroupDAO.java | 14 +- .../canvas/process-group-configuration.jsp | 18 +++ .../css/process-group-configuration.css | 8 + .../canvas/nf-process-group-configuration.js | 45 +++++- .../nifi/tests/system/NiFiClientUtil.java | 72 ++++++++- .../nifi/tests/system/NiFiSystemIT.java | 31 +++- .../pg/SingleFlowFileConcurrencyIT.java | 152 ++++++++++++++++++ .../impl/client/nifi/OutputPortClient.java | 6 + .../nifi/impl/JerseyOutputPortClient.java | 5 + 27 files changed, 921 insertions(+), 38 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java index 349d0bb31b..1b3bfe20cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java @@ -34,6 +34,8 @@ public class ProcessGroupDTO extends ComponentDTO { private Map variables; private VersionControlInformationDTO versionControlInformation; private ParameterContextReferenceEntity parameterContext; + private String flowfileConcurrency; + private String flowfileOutboundPolicy; private Integer runningCount; private Integer stoppedCount; @@ -352,4 +354,23 @@ public class ProcessGroupDTO extends ComponentDTO { public void setParameterContext(final ParameterContextReferenceEntity parameterContext) { this.parameterContext = parameterContext; } + + @ApiModelProperty(value = "The FlowFile Concurrency for this Process Group.", allowableValues = "UNBOUNDED, SINGLE_FLOWFILE_PER_NODE") + public String getFlowfileConcurrency() { + return flowfileConcurrency; + } + + public void setFlowfileConcurrency(final String flowfileConcurrency) { + this.flowfileConcurrency = flowfileConcurrency; + } + + @ApiModelProperty(value = "The Oubound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.", + allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT") + public String getFlowfileOutboundPolicy() { + return flowfileOutboundPolicy; + } + + public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) { + this.flowfileOutboundPolicy = flowfileOutboundPolicy; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java new file mode 100644 index 0000000000..6c91c66a10 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +/** + * Specifies the concurrency level of a Process Group + */ +public enum FlowFileConcurrency { + + /** + * Only a single FlowFile is to be allowed to enter the Process Group at a time. + * While that FlowFile may be split into many or spawn many children, no additional FlowFiles will be + * allowed to enter the Process Group through a Local Input Port until the previous FlowFile - and all of its + * child/descendent FlowFiles - have been processed. In a clustered instance, each node may allow through + * a single FlowFile at a time, so multiple FlowFiles may still be processed concurrently across the cluster. + */ + SINGLE_FLOWFILE_PER_NODE, + + /** + * The number of FlowFiles that can be processed concurrently is unbounded. + */ + UNBOUNDED; + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java new file mode 100644 index 0000000000..e700851883 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +public interface FlowFileGate { + + boolean tryClaim(); + + void releaseClaim(); + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java new file mode 100644 index 0000000000..767dfae478 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +public enum FlowFileOutboundPolicy { + + /** + * FlowFiles that are queued up to be transferred out of a ProcessGroup by an Output Port will be transferred + * out of the Process Group as soon as they are available. + */ + STREAM_WHEN_AVAILABLE, + + /** + * FlowFiles that are queued up to be transferred out of a Process Group by an Output Port will remain queued until + * all FlowFiles in the Process Group are ready to be transferred out of the group. The FlowFiles will then be transferred + * out of the group. I.e., the FlowFiles will be batched together and transferred at the same time (not necessarily in a single + * Process Session) but no FlowFile will be transferred until all FlowFiles in the group are ready to be transferred. + */ + BATCH_OUTPUT; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index a386b49d51..3e111cfcf7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -1062,4 +1062,46 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param updatedParameters a Map of parameter name to the ParameterUpdate that describes how the Parameter was updated */ void onParameterContextUpdated(Map updatedParameters); + + /** + * @return the FlowFileGate that must be used for obtaining a claim before an InputPort is allowed to bring data into a ProcessGroup + */ + FlowFileGate getFlowFileGate(); + + /** + * @return the FlowFileConcurrency that is currently configured for the ProcessGroup + */ + FlowFileConcurrency getFlowFileConcurrency(); + + /** + * Sets the FlowFileConcurrency to use for this ProcessGroup + * @param flowFileConcurrency the FlowFileConcurrency to use + */ + void setFlowFileConcurrency(FlowFileConcurrency flowFileConcurrency); + + /** + * @return the FlowFile Outbound Policy that governs the behavior of this Process Group + */ + FlowFileOutboundPolicy getFlowFileOutboundPolicy(); + + /** + * Specifies the FlowFile Outbound Policy that should be applied to this Process Group + * @param outboundPolicy the policy to enforce. + */ + void setFlowFileOutboundPolicy(FlowFileOutboundPolicy outboundPolicy); + + /** + * @return true if at least one FlowFile resides in a FlowFileQueue in this Process Group or a child ProcessGroup, false otherwise + */ + boolean isDataQueued(); + + /** + * Indicates whether or not data is queued for Processing. Data is considered queued for processing if it is enqueued in a Connection and + * the destination of that Connection is not an Output Port, OR if the data is enqueued within a child group, regardless of whether or not it is + * queued before an Output Port. I.e., any data that is enqueued in this Process Group is enqueued for Processing unless it is ready to be transferred + * out of this Process Group. + * + * @return true if there is data that is queued for Processing, false otherwise + */ + boolean isDataQueuedForProcessing(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index 7f1173e486..c636f921c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -20,11 +20,16 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileGate; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -39,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * NiFi. */ public class LocalPort extends AbstractPort { + private static final Logger logger = LoggerFactory.getLogger(LocalPort.class); // "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure // the number of concurrent tasks to schedule for local ports and funnels. @@ -51,7 +57,7 @@ public class LocalPort extends AbstractPort { private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - final int maxIterations; + private final int maxIterations; public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { super(id, name, type, scheduler); @@ -61,8 +67,12 @@ public class LocalPort extends AbstractPort { int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000")); maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0)); + setYieldPeriod(nifiProperties.getBoredYieldDuration()); } + protected int getMaxIterations() { + return maxIterations; + } private boolean[] validateConnections() { // LocalPort requires both in/out. @@ -109,31 +119,107 @@ public class LocalPort extends AbstractPort { public void onTrigger(final ProcessContext context, final ProcessSession session) { readLock.lock(); try { - Set available = context.getAvailableRelationships(); - int iterations = 0; - while (!available.isEmpty()) { - final List flowFiles = session.get(1000); - if (flowFiles.isEmpty()) { - break; - } - - session.transfer(flowFiles, Relationship.ANONYMOUS); - session.commit(); - - // If there are fewer than 1,000 FlowFiles available to transfer, or if we - // have hit the configured FlowFile cap, we want to stop. This prevents us from - // holding the Timer-Driven Thread for an excessive amount of time. - if (flowFiles.size() < 1000 || ++iterations >= maxIterations) { - break; - } - - available = context.getAvailableRelationships(); + if (getConnectableType() == ConnectableType.OUTPUT_PORT) { + triggerOutputPort(context, session); + } else { + triggerInputPort(context, session); } } finally { readLock.unlock(); } } + private void triggerOutputPort(final ProcessContext context, final ProcessSession session) { + final boolean shouldTransfer = isTransferDataOut(); + if (shouldTransfer) { + transferUnboundedConcurrency(context, session); + } else { + context.yield(); + } + } + + private void triggerInputPort(final ProcessContext context, final ProcessSession session) { + final FlowFileGate flowFileGate = getProcessGroup().getFlowFileGate(); + final boolean obtainedClaim = flowFileGate.tryClaim(); + if (!obtainedClaim) { + logger.trace("{} failed to obtain claim for FlowFileGate. Will yield and will not transfer any FlowFiles", this); + context.yield(); + return; + } + + try { + logger.trace("{} obtained claim for FlowFileGate", this); + + final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency(); + switch (flowFileConcurrency) { + case UNBOUNDED: + transferUnboundedConcurrency(context, session); + break; + case SINGLE_FLOWFILE_PER_NODE: + transferSingleFlowFile(session); + break; + } + } finally { + flowFileGate.releaseClaim(); + logger.trace("{} released claim for FlowFileGate", this); + } + } + + private boolean isTransferDataOut() { + final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency(); + if (flowFileConcurrency == FlowFileConcurrency.UNBOUNDED) { + logger.trace("{} will transfer data out of Process Group because FlowFile Concurrency is Unbounded", this); + return true; + } + + final FlowFileOutboundPolicy outboundPolicy = getProcessGroup().getFlowFileOutboundPolicy(); + if (outboundPolicy == FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE) { + logger.trace("{} will transfer data out of Process Group because FlowFile Outbound Policy is Stream When Available", this); + return true; + } + + final boolean queuedForProcessing = getProcessGroup().isDataQueuedForProcessing(); + if (queuedForProcessing) { + logger.trace("{} will not transfer data out of Process Group because FlowFile Outbound Policy is Batch Output and there is data queued for Processing", this); + return false; + } + + logger.trace("{} will transfer data out of Process Group because there is no data queued for processing", this); + return true; + } + + private void transferSingleFlowFile(final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + session.transfer(flowFile, Relationship.ANONYMOUS); + } + + protected void transferUnboundedConcurrency(final ProcessContext context, final ProcessSession session) { + Set available = context.getAvailableRelationships(); + int iterations = 0; + while (!available.isEmpty()) { + final List flowFiles = session.get(1000); + if (flowFiles.isEmpty()) { + break; + } + + session.transfer(flowFiles, Relationship.ANONYMOUS); + session.commit(); + + // If there are fewer than 1,000 FlowFiles available to transfer, or if we + // have hit the configured FlowFile cap, we want to stop. This prevents us from + // holding the Timer-Driven Thread for an excessive amount of time. + if (flowFiles.size() < 1000 || ++iterations >= maxIterations) { + break; + } + + available = context.getAvailableRelationships(); + } + } + @Override public void updateConnection(final Connection connection) throws IllegalStateException { writeLock.lock(); @@ -223,4 +309,9 @@ public class LocalPort extends AbstractPort { public String getComponentType() { return "Local Port"; } + + @Override + public String toString() { + return "LocalPort[id=" + getIdentifier() + ", type=" + getConnectableType() + "]"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java index a65583f8e5..85e598485e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java @@ -31,6 +31,8 @@ import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; @@ -454,6 +456,17 @@ public class StandardFlowSnippet implements FlowSnippet { childGroup.setPosition(toPosition(groupDTO.getPosition())); childGroup.setComments(groupDTO.getComments()); childGroup.setName(groupDTO.getName()); + + final String flowfileConcurrentName = groupDTO.getFlowfileConcurrency(); + if (flowfileConcurrentName != null) { + childGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrentName)); + } + + final String outboundPolicyName = groupDTO.getFlowfileOutboundPolicy(); + if (outboundPolicyName != null) { + childGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(outboundPolicyName)); + } + if (groupDTO.getVariables() != null) { childGroup.setVariables(groupDTO.getVariables()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index c9460f8d48..7bb5052696 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -57,6 +57,8 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; @@ -1156,6 +1158,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final String name = dto.getName(); final PositionDTO position = dto.getPosition(); final String comments = dto.getComments(); + final String flowfileConcurrencyName = dto.getFlowfileConcurrency(); + final String flowfileOutboundPolicyName = dto.getFlowfileOutboundPolicy(); if (name != null) { group.setName(name); @@ -1167,6 +1171,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { group.setComments(comments); } + if (flowfileConcurrencyName == null) { + group.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED); + } else { + group.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName)); + } + + if (flowfileOutboundPolicyName == null) { + group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE); + } else { + group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName)); + } + final ParameterContextReferenceEntity parameterContextReference = dto.getParameterContext(); if (parameterContextReference != null && parameterContextReference.getId() != null) { final String parameterContextId = parameterContextReference.getId(); @@ -1274,6 +1290,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { parentGroup.addProcessGroup(processGroup); } + final String flowfileConcurrencyName = processGroupDTO.getFlowfileConcurrency(); + final String flowfileOutboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy(); + if (flowfileConcurrencyName == null) { + processGroup.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED); + } else { + processGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName)); + } + + if (flowfileOutboundPolicyName == null) { + processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE); + } else { + processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName)); + } + + final String parameterContextId = getString(processGroupElement, "parameterContextId"); if (parameterContextId != null) { final ParameterContext parameterContext = controller.getFlowManager().getParameterContextManager().getParameterContext(parameterContextId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index e64e573bfd..84d8cf4a71 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -183,6 +183,8 @@ public class FlowFromDOMFactory { dto.setName(getString(element, "name")); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); dto.setComments(getString(element, "comment")); + dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency")); + dto.setFlowfileOutboundPolicy(getString(element, "flowfileOutboundPolicy")); final Map variables = new HashMap<>(); final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index a9b203e9ac..920ccf5a3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -234,6 +234,8 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", group.getName()); addPosition(element, group.getPosition()); addTextElement(element, "comment", group.getComments()); + addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name()); + addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name()); final VersionControlInformation versionControlInfo = group.getVersionControlInformation(); if (versionControlInfo != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index c3c95e60cf..7a11c51b5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -350,6 +350,8 @@ public class FingerprintFactory { appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy")); final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation"); if (versionControlInfo == null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java new file mode 100644 index 0000000000..da1041afcf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; + +public class SingleConcurrencyFlowFileGate implements FlowFileGate { + private final BooleanSupplier groupEmptyCheck; + private final AtomicBoolean claimed = new AtomicBoolean(false); + + public SingleConcurrencyFlowFileGate(final BooleanSupplier groupEmptyCheck) { + this.groupEmptyCheck = groupEmptyCheck; + } + + @Override + public boolean tryClaim() { + // Check if the claim is already held and atomically set it to being held. + final boolean alreadyClaimed = claimed.getAndSet(true); + if (alreadyClaimed) { + // If claim was already held, then this thread failed to obtain the claim. + return false; + } + + // The claim is now held by this thread. Check if the ProcessGroup is empty. + final boolean empty = groupEmptyCheck.getAsBoolean(); + if (empty) { + // Process Group is empty so return true indicating that the claim is now held. + return true; + } + + // Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false, + // indicating that the caller did not obtain the claim. + claimed.set(false); + return false; + } + + @Override + public void releaseClaim() { + claimed.set(false); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 0c091d8f66..da535275a8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -196,6 +196,10 @@ public final class StandardProcessGroup implements ProcessGroup { private final VersionControlFields versionControlFields = new VersionControlFields(); private volatile ParameterContext parameterContext; + private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED; + private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate(); + private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE; + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); @@ -5280,4 +5284,93 @@ public final class StandardProcessGroup implements ProcessGroup { } } } + + @Override + public FlowFileGate getFlowFileGate() { + return flowFileGate; + } + + @Override + public FlowFileConcurrency getFlowFileConcurrency() { + readLock.lock(); + try { + return flowFileConcurrency; + } finally { + readLock.unlock(); + } + } + + @Override + public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) { + writeLock.lock(); + try { + if (this.flowFileConcurrency == flowFileConcurrency) { + return; + } + + this.flowFileConcurrency = flowFileConcurrency; + switch (flowFileConcurrency) { + case UNBOUNDED: + flowFileGate = new UnboundedFlowFileGate(); + break; + case SINGLE_FLOWFILE_PER_NODE: + flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued()); + break; + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean isDataQueued() { + return isDataQueued(connection -> true); + } + + @Override + public boolean isDataQueuedForProcessing() { + // Data is queued for processing if a connection has data queued and the connection's destination is NOT an Output Port. + return isDataQueued(connection -> connection.getDestination().getConnectableType() != ConnectableType.OUTPUT_PORT); + } + + private boolean isDataQueued(final Predicate connectionFilter) { + readLock.lock(); + try { + for (final Connection connection : this.connections.values()) { + // If the connection doesn't pass the filter, just skip over it. + if (!connectionFilter.test(connection)) { + continue; + } + + final boolean queueEmpty = connection.getFlowFileQueue().isEmpty(); + if (!queueEmpty) { + return true; + } + } + + for (final ProcessGroup child : this.processGroups.values()) { + // Check if the child Process Group has any data enqueued. Note that we call #isDataQueued here and NOT + // #isDataQueeudForProcesing. I.e., regardless of whether this is called from #isDataQueued or #isDataQueuedForProcessing, + // for child groups, we only call #isDataQueued. This is because if data is queued up for the Output Port of a child group, + // it is still considered to be data that is being processed by this Process Group. + if (child.isDataQueued()) { + return true; + } + } + + return false; + } finally { + readLock.unlock(); + } + } + + @Override + public FlowFileOutboundPolicy getFlowFileOutboundPolicy() { + return flowFileOutboundPolicy; + } + + @Override + public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) { + this.flowFileOutboundPolicy = flowFileOutboundPolicy; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java new file mode 100644 index 0000000000..352053beb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +public class UnboundedFlowFileGate implements FlowFileGate { + @Override + public boolean tryClaim() { + return true; + } + + @Override + public void releaseClaim() { + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index ca43ebce19..36d7d92d9a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -178,6 +178,8 @@ + + @@ -203,6 +205,21 @@ + + + + + + + + + + + + + + + @@ -216,8 +233,7 @@ + - RootProcessGroupType doesn't have versionControlInformation --> @@ -225,6 +241,8 @@ + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java index 25b8929cf8..aa6ffe4867 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java @@ -37,14 +37,14 @@ public class TestLocalPort { public void testDefaultValues() { LocalPort port = getLocalInputPort(); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(10, port.maxIterations); + assertEquals(10, port.getMaxIterations()); } @Test public void testSetConcurrentTasks() { LocalPort port = getLocalInputPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2"); assertEquals(2, port.getMaxConcurrentTasks()); - assertEquals(10, port.maxIterations); + assertEquals(10, port.getMaxIterations()); } @Test @@ -52,27 +52,27 @@ public class TestLocalPort { { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(100, port.maxIterations); + assertEquals(100, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(101, port.maxIterations); + assertEquals(101, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(100, port.maxIterations); + assertEquals(100, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(1, port.maxIterations); + assertEquals(1, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(1, port.maxIterations); + assertEquals(1, port.getMaxIterations()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 2955838fa0..c5f05bc267 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -32,6 +32,9 @@ import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileGate; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; @@ -719,6 +722,48 @@ public class MockProcessGroup implements ProcessGroup { public void onParameterContextUpdated(final Map updatedParameters) { } + @Override + public FlowFileGate getFlowFileGate() { + return new FlowFileGate() { + @Override + public boolean tryClaim() { + return true; + } + + @Override + public void releaseClaim() { + } + }; + } + + @Override + public FlowFileConcurrency getFlowFileConcurrency() { + return FlowFileConcurrency.UNBOUNDED; + } + + @Override + public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) { + } + + @Override + public FlowFileOutboundPolicy getFlowFileOutboundPolicy() { + return FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE; + } + + @Override + public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy outboundPolicy) { + } + + @Override + public boolean isDataQueued() { + return false; + } + + @Override + public boolean isDataQueuedForProcessing() { + return false; + } + @Override public void terminateProcessor(ProcessorNode processor) { } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 2557241d8e..9d70f9e63a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -2477,6 +2477,8 @@ public final class DtoFactory { dto.setName(group.getName()); dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null)); dto.setVersionControlInformation(createVersionControlInformationDto(group)); + dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name()); + dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name()); final ParameterContext parameterContext = group.getParameterContext(); if (parameterContext != null) { @@ -4284,6 +4286,8 @@ public final class DtoFactory { copy.setOutputPortCount(original.getOutputPortCount()); copy.setParentGroupId(original.getParentGroupId()); copy.setVersionedComponentId(original.getVersionedComponentId()); + copy.setFlowfileConcurrency(original.getFlowfileConcurrency()); + copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy()); copy.setRunningCount(original.getRunningCount()); copy.setStoppedCount(original.getStoppedCount()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index b5f5bcfae2..ba2c6c232b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -25,6 +25,8 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.parameter.ParameterContext; @@ -335,6 +337,11 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final String name = processGroupDTO.getName(); final String comments = processGroupDTO.getComments(); + final String concurrencyName = processGroupDTO.getFlowfileConcurrency(); + final FlowFileConcurrency flowFileConcurrency = concurrencyName == null ? null : FlowFileConcurrency.valueOf(concurrencyName); + + final String outboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy(); + final FlowFileOutboundPolicy flowFileOutboundPolicy = outboundPolicyName == null ? null : FlowFileOutboundPolicy.valueOf(outboundPolicyName); final ParameterContextReferenceEntity parameterContextReference = processGroupDTO.getParameterContext(); if (parameterContextReference != null) { @@ -364,7 +371,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou if (isNotNull(comments)) { group.setComments(comments); } - + if (flowFileConcurrency != null) { + group.setFlowFileConcurrency(flowFileConcurrency); + } + if (flowFileOutboundPolicy != null) { + group.setFlowFileOutboundPolicy(flowFileOutboundPolicy); + } group.onComponentModified(); return group; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp index b21ba0c102..eb35513796 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp @@ -54,6 +54,24 @@ +
+
Process group FlowFile concurrency
+
+
+
+
+ +
+
+
+
Process group outbound policy
+
+
+
+
+ +
+
Apply
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css index ac968b4311..b6322f6195 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css @@ -85,6 +85,14 @@ width: 328px; } +#process-group-flowfile-concurrency-combo { + width: 328px; +} + +#process-group-outbound-policy-combo { + width: 328px; +} + #process-group-comments { height: 100px; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js index c4dbfd8c4f..79af2aba0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js @@ -105,7 +105,9 @@ 'comments': $('#process-group-comments').val(), 'parameterContext': { 'id': $('#process-group-parameter-context-combo').combo('getSelectedOption').value - } + }, + 'flowfileConcurrency': $('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value, + 'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value } }; @@ -212,6 +214,41 @@ // populate the process group settings $('#process-group-name').removeClass('unset').val(processGroup.name); $('#process-group-comments').removeClass('unset').val(processGroup.comments); + $('#process-group-flowfile-concurrency-combo').removeClass('unset').combo({ + options: [{ + text: 'Single FlowFile Per Node', + value: 'SINGLE_FLOWFILE_PER_NODE', + description: 'Only a single FlowFile is to be allowed to enter the Process Group at a time on each node in the cluster. While that FlowFile may be split into many or ' + + 'spawn many children, no additional FlowFiles will be allowed to enter the Process Group through a Local Input Port until the previous FlowFile ' + + '- and all of its child/descendent FlowFiles - have been processed.' + }, { + text: 'Unbounded', + value: 'UNBOUNDED', + description: 'The number of FlowFiles that can be processed concurrently is unbounded.' + }], + selectedOption: { + value: processGroup.flowfileConcurrency + } + }); + + $('#process-group-outbound-policy-combo').removeClass('unset').combo({ + options: [{ + text: 'Stream When Available', + value: 'STREAM_WHEN_AVAILABLE', + description: 'FlowFiles that are queued up to be transferred out of a ProcessGroup by an Output Port will be transferred out ' + + 'of the Process Group as soon as they are available.' + }, { + text: 'Batch Output', + value: 'BATCH_OUTPUT', + description: 'FlowFiles that are queued up to be transferred out of a Process Group by an Output Port will remain queued until ' + + 'all FlowFiles in the Process Group are ready to be transferred out of the group. The FlowFiles will then be transferred ' + + 'out of the group. This setting will be ignored if the FlowFile Concurrency is Unbounded.' + }], + selectedOption: { + value: processGroup.flowfileOutboundPolicy + } + }); + // populate the header $('#process-group-configuration-header-text').text(processGroup.name + ' Configuration'); @@ -228,6 +265,12 @@ $('#read-only-process-group-name').text(processGroup.name); $('#read-only-process-group-comments').text(processGroup.comments); + var concurrencyName = processGroup.flowfileConcurrency == "UNBOUNDED" ? "Unbounded" : "Single FlowFile Per Node"; + $('#read-only-process-group-flowfile-concurrency').text(concurrencyName); + + var outboundPolicyName = processGroup.flowfileOutboundPolicy == "BATCH_OUTPUT" ? "Batch Output" : "Stream When Available"; + $('#read-only-process-group-outbound-policy').text(outboundPolicyName); + // populate the header $('#process-group-configuration-header-text').text(processGroup.name + ' Configuration'); } else { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index a9806a0c3c..3c5d4f960e 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -17,6 +17,7 @@ package org.apache.nifi.tests.system; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.QueueSize; @@ -380,12 +381,21 @@ public class NiFiClientUtil { return counterValues; } + public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException { + final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity(); + scheduleComponentsEntity.setId(groupId); + scheduleComponentsEntity.setState("RUNNING"); + final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents("root", scheduleComponentsEntity); + + return scheduleEntity; + } + public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException { final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity(); - scheduleComponentsEntity.setId("root"); + scheduleComponentsEntity.setId(groupId); scheduleComponentsEntity.setState("STOPPED"); final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents("root", scheduleComponentsEntity); - waitForProcessorsStopped("root"); + waitForProcessorsStopped(groupId); return scheduleEntity; } @@ -536,6 +546,18 @@ public class NiFiClientUtil { } } + public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination) throws NiFiClientException, IOException { + return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName())); + } + + public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination) throws NiFiClientException, IOException { + return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName())); + } + + public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final String relationship) throws NiFiClientException, IOException { + return createConnection(source, destination, Collections.singleton(relationship)); + } + public ConnectionEntity createConnection(final ProcessorEntity source, final ProcessorEntity destination, final String relationship) throws NiFiClientException, IOException { return createConnection(source, destination, Collections.singleton(relationship)); } @@ -548,27 +570,41 @@ public class NiFiClientUtil { return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); } + public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final Set relationships) throws NiFiClientException, IOException { + return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); + } + + public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination, final Set relationships) throws NiFiClientException, IOException { + return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); + } + + public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination, final Set relationships) throws NiFiClientException, IOException { + return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); + } + public ConnectionEntity createConnection(final ConnectableDTO source, final ConnectableDTO destination, final Set relationships) throws NiFiClientException, IOException { + final String groupId = "OUTPUT_PORT".equals(source.getType()) ? destination.getGroupId() : source.getGroupId(); + final ConnectionDTO connectionDto = new ConnectionDTO(); connectionDto.setSelectedRelationships(relationships); connectionDto.setDestination(destination); connectionDto.setSource(source); - connectionDto.setParentGroupId(source.getGroupId()); + connectionDto.setParentGroupId(groupId); final ConnectionEntity connectionEntity = new ConnectionEntity(); connectionEntity.setComponent(connectionDto); connectionEntity.setDestinationGroupId(destination.getGroupId()); connectionEntity.setDestinationId(destination.getId()); - connectionEntity.setDestinationType("PROCESSOR"); + connectionEntity.setDestinationType(destination.getType()); connectionEntity.setSourceGroupId(source.getGroupId()); connectionEntity.setSourceId(source.getId()); - connectionEntity.setDestinationType("PROCESSOR"); + connectionEntity.setSourceType(source.getType()); connectionEntity.setRevision(createNewRevision()); - return nifiClient.getConnectionClient().createConnection(source.getGroupId(), connectionEntity); + return nifiClient.getConnectionClient().createConnection(groupId, connectionEntity); } public ConnectableDTO createConnectableDTO(final ProcessorEntity processor) { @@ -778,6 +814,30 @@ public class NiFiClientUtil { return childGroup; } + public PortEntity createInputPort(final String name, final String groupId) throws NiFiClientException, IOException { + final PortDTO component = new PortDTO(); + component.setName(name); + component.setParentGroupId(groupId); + + final PortEntity inputPortEntity = new PortEntity(); + inputPortEntity.setRevision(createNewRevision()); + inputPortEntity.setComponent(component); + + return nifiClient.getInputPortClient().createInputPort(groupId, inputPortEntity); + } + + public PortEntity createOutputPort(final String name, final String groupId) throws NiFiClientException, IOException { + final PortDTO component = new PortDTO(); + component.setName(name); + component.setParentGroupId(groupId); + + final PortEntity outputPortEntity = new PortEntity(); + outputPortEntity.setRevision(createNewRevision()); + outputPortEntity.setComponent(component); + + return nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity); + } + public ProvenanceEntity queryProvenance(final Map searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException { final Map searchTermsAsStrings = searchTerms.entrySet().stream() .collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue)); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index 1e37b018d9..a57d4ea5b5 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -30,18 +30,26 @@ import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; public abstract class NiFiSystemIT { + private static final Logger logger = LoggerFactory.getLogger(NiFiSystemIT.class); + private final ConcurrentMap lastLogTimestamps = new ConcurrentHashMap<>(); + public static final int CLIENT_API_PORT = 5671; public static final String NIFI_GROUP_ID = "org.apache.nifi"; public static final String TEST_EXTENSIONS_ARTIFACT_ID = "nifi-system-test-extensions-nar"; @@ -131,6 +139,8 @@ public abstract class NiFiSystemIT { } protected void waitForAllNodesConnected(final int expectedNumberOfNodes, final long sleepMillis) { + logger.info("Waiting for {} nodes to connect", expectedNumberOfNodes); + final NiFiClient client = getNifiClient(); final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60); @@ -142,6 +152,8 @@ public abstract class NiFiSystemIT { return; } + logEverySecond("Waiting for {} nodes to connect but currently on {} nodes are connected", expectedNumberOfNodes, connectedNodeCount); + if (System.currentTimeMillis() > maxTime) { throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); } @@ -262,10 +274,27 @@ public abstract class NiFiSystemIT { } protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException { + logger.info("Waiting for Queue Count of {} on Connection {}", queueSize, connectionId); + waitFor(() -> { final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); - return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == queueSize; + final int currentSize = statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued(); + final String sourceName = statusEntity.getConnectionStatus().getSourceName(); + final String destinationName = statusEntity.getConnectionStatus().getDestinationName(); + logEverySecond("Current Queue Size for Connection from {} to {} = {}, Waiting for {}", sourceName, destinationName, currentSize, queueSize); + + return currentSize == queueSize; }); + + logger.info("Queue Count for Connection {} is now {}", connectionId, queueSize); + } + + private void logEverySecond(final String message, final Object... args) { + final Long lastLogTime = lastLogTimestamps.get(message); + if (lastLogTime == null || lastLogTime < System.currentTimeMillis() - 1000L) { + logger.info(message, args); + lastLogTimestamps.put(message, System.currentTimeMillis()); + } } private ConnectionStatusEntity getConnectionStatus(final String connectionId) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java new file mode 100644 index 0000000000..02e8bbc787 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.pg; + +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { + + + @Test + public void testSingleConcurrency() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); + final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); + final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("Batch Size", "3")); + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + + final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupEntity.getId()); + getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "5 sec")); + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Connect Generate -> Input Port -> Sleep -> Output Port -> Terminate + // Since we will use Single FlowFile at a time concurrency, we should see that the connection between Input Port and Sleep + // never has more than 1 FlowFile even though the Sleep processor takes a long time. + final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success"); + final ConnectionEntity inputToSleep = getClientUtil().createConnection(inputPort, sleep); + getClientUtil().createConnection(sleep, outputPort, "success"); + final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate); + + processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name()); + getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity); + + // Start all components except for Terminate. We want the data to queue up before terminate so we can ensure that the + // correct number of FlowFiles are queued up. + getClientUtil().startProcessGroupComponents("root"); + getNifiClient().getProcessorClient().stopProcessor(terminate); + + // Wait for 1 FlowFile to queue up for the Sleep Processor. This should leave 2 FlowFiles queued up for the input port. + waitForQueueCount(inputToSleep.getId(), 1); + assertEquals(2, getConnectionQueueSize(generateToInput.getId())); + + // Wait until only 1 FlowFile is queued up for the input port. Because Sleep should take 5 seconds to complete its job, + // It should take 5 seconds for this to happen. But it won't be exact. So we'll ensure that it takes at least 3 seconds. We could + // put an upper bound such as 6 or 7 seconds as well, but it's a good idea to avoid that because the tests may run in some environments + // with constrained resources that may take a lot longer to run. + final long startTime = System.currentTimeMillis(); + waitForQueueCount(generateToInput.getId(), 1); + final long endTime = System.currentTimeMillis(); + final long delay = endTime - startTime; + assertTrue(delay > 3000L); + + assertEquals(1, getConnectionQueueSize(inputToSleep.getId())); + + waitForQueueCount(outputToTerminate.getId(), 2); + + // Wait until all FlowFiles have been ingested. + waitForQueueCount(generateToInput.getId(), 0); + assertEquals(1, getConnectionQueueSize(inputToSleep.getId())); + + // Ensure that 3 FlowFiles are queued up for Terminate + waitForQueueCount(outputToTerminate.getId(), 3); + } + + + @Test + public void testSingleConcurrencyAndBatchOutput() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); + final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); + final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); + final PortEntity secondOut = getClientUtil().createOutputPort("Out2", processGroupEntity.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + + final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupEntity.getId()); // sleep with default configuration is just a simple pass-through + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Connect Generate -> Input Port -> Count -> Output Port -> Terminate + // Also connect InputPort -> Out2 -> Terminate + final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success"); + final ConnectionEntity inputToSleep = getClientUtil().createConnection(inputPort, sleep); + final ConnectionEntity sleepToOutput = getClientUtil().createConnection(sleep, outputPort, "success"); + final ConnectionEntity inputToSecondOut = getClientUtil().createConnection(inputPort, secondOut); + final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate); + final ConnectionEntity secondOutToTerminate = getClientUtil().createConnection(secondOut, terminate); + + processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name()); + processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name()); + getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity); + + // Start generate so that data is created. Start Input Port so that the data is ingested. + // Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor, + // and that should prevent data from being transferred by Output Port "Out2" also. + getNifiClient().getProcessorClient().startProcessor(generate); + getNifiClient().getInputPortClient().startInputPort(inputPort); + getNifiClient().getOutputPortClient().startOutputPort(outputPort); + getNifiClient().getOutputPortClient().startOutputPort(secondOut); + + waitForQueueCount(inputToSleep.getId(), 1); + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + + // Wait 3 seconds to ensure that data is never transferred + for (int i=0; i < 3; i++) { + Thread.sleep(1000L); + assertEquals(1, getConnectionQueueSize(inputToSleep.getId())); + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + } + + // Start Sleep + getNifiClient().getProcessorClient().startProcessor(sleep); + + // Data should now flow from both output ports. + waitForQueueCount(inputToSleep.getId(), 0); + waitForQueueCount(inputToSecondOut.getId(), 0); + + assertEquals(1, getConnectionQueueSize(outputToTerminate.getId())); + assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId())); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java index b635404e2c..1bfab8778a 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java @@ -30,7 +30,13 @@ public interface OutputPortClient { PortEntity deleteOutputPort(PortEntity entity) throws NiFiClientException, IOException; + /** + * @deprecated use startOutputPort + */ + @Deprecated PortEntity startInpuOutputPort(PortEntity entity) throws NiFiClientException, IOException; + PortEntity startOutputPort(PortEntity entity) throws NiFiClientException, IOException; + PortEntity stopOutputPort(PortEntity entity) throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java index 46637fcf56..645c95f9a2 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java @@ -62,6 +62,11 @@ public class JerseyOutputPortClient extends CRUDJerseyClient impleme @Override public PortEntity startInpuOutputPort(final PortEntity entity) throws NiFiClientException, IOException { + return startOutputPort(entity); + } + + @Override + public PortEntity startOutputPort(final PortEntity entity) throws NiFiClientException, IOException { final PortEntity startEntity = createStateEntity(entity, "RUNNING"); return updateOutputPort(startEntity); }