diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java index 349d0bb31b..1b3bfe20cf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/ProcessGroupDTO.java @@ -34,6 +34,8 @@ public class ProcessGroupDTO extends ComponentDTO { private Map variables; private VersionControlInformationDTO versionControlInformation; private ParameterContextReferenceEntity parameterContext; + private String flowfileConcurrency; + private String flowfileOutboundPolicy; private Integer runningCount; private Integer stoppedCount; @@ -352,4 +354,23 @@ public class ProcessGroupDTO extends ComponentDTO { public void setParameterContext(final ParameterContextReferenceEntity parameterContext) { this.parameterContext = parameterContext; } + + @ApiModelProperty(value = "The FlowFile Concurrency for this Process Group.", allowableValues = "UNBOUNDED, SINGLE_FLOWFILE_PER_NODE") + public String getFlowfileConcurrency() { + return flowfileConcurrency; + } + + public void setFlowfileConcurrency(final String flowfileConcurrency) { + this.flowfileConcurrency = flowfileConcurrency; + } + + @ApiModelProperty(value = "The Oubound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.", + allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT") + public String getFlowfileOutboundPolicy() { + return flowfileOutboundPolicy; + } + + public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) { + this.flowfileOutboundPolicy = flowfileOutboundPolicy; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java new file mode 100644 index 0000000000..6c91c66a10 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileConcurrency.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +/** + * Specifies the concurrency level of a Process Group + */ +public enum FlowFileConcurrency { + + /** + * Only a single FlowFile is to be allowed to enter the Process Group at a time. + * While that FlowFile may be split into many or spawn many children, no additional FlowFiles will be + * allowed to enter the Process Group through a Local Input Port until the previous FlowFile - and all of its + * child/descendent FlowFiles - have been processed. In a clustered instance, each node may allow through + * a single FlowFile at a time, so multiple FlowFiles may still be processed concurrently across the cluster. + */ + SINGLE_FLOWFILE_PER_NODE, + + /** + * The number of FlowFiles that can be processed concurrently is unbounded. + */ + UNBOUNDED; + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java new file mode 100644 index 0000000000..e700851883 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileGate.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +public interface FlowFileGate { + + boolean tryClaim(); + + void releaseClaim(); + +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java new file mode 100644 index 0000000000..767dfae478 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/FlowFileOutboundPolicy.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +public enum FlowFileOutboundPolicy { + + /** + * FlowFiles that are queued up to be transferred out of a ProcessGroup by an Output Port will be transferred + * out of the Process Group as soon as they are available. + */ + STREAM_WHEN_AVAILABLE, + + /** + * FlowFiles that are queued up to be transferred out of a Process Group by an Output Port will remain queued until + * all FlowFiles in the Process Group are ready to be transferred out of the group. The FlowFiles will then be transferred + * out of the group. I.e., the FlowFiles will be batched together and transferred at the same time (not necessarily in a single + * Process Session) but no FlowFile will be transferred until all FlowFiles in the group are ready to be transferred. + */ + BATCH_OUTPUT; +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index a386b49d51..3e111cfcf7 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -1062,4 +1062,46 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @param updatedParameters a Map of parameter name to the ParameterUpdate that describes how the Parameter was updated */ void onParameterContextUpdated(Map updatedParameters); + + /** + * @return the FlowFileGate that must be used for obtaining a claim before an InputPort is allowed to bring data into a ProcessGroup + */ + FlowFileGate getFlowFileGate(); + + /** + * @return the FlowFileConcurrency that is currently configured for the ProcessGroup + */ + FlowFileConcurrency getFlowFileConcurrency(); + + /** + * Sets the FlowFileConcurrency to use for this ProcessGroup + * @param flowFileConcurrency the FlowFileConcurrency to use + */ + void setFlowFileConcurrency(FlowFileConcurrency flowFileConcurrency); + + /** + * @return the FlowFile Outbound Policy that governs the behavior of this Process Group + */ + FlowFileOutboundPolicy getFlowFileOutboundPolicy(); + + /** + * Specifies the FlowFile Outbound Policy that should be applied to this Process Group + * @param outboundPolicy the policy to enforce. + */ + void setFlowFileOutboundPolicy(FlowFileOutboundPolicy outboundPolicy); + + /** + * @return true if at least one FlowFile resides in a FlowFileQueue in this Process Group or a child ProcessGroup, false otherwise + */ + boolean isDataQueued(); + + /** + * Indicates whether or not data is queued for Processing. Data is considered queued for processing if it is enqueued in a Connection and + * the destination of that Connection is not an Output Port, OR if the data is enqueued within a child group, regardless of whether or not it is + * queued before an Output Port. I.e., any data that is enqueued in this Process Group is enqueued for Processing unless it is ready to be transferred + * out of this Process Group. + * + * @return true if there is data that is queued for Processing, false otherwise + */ + boolean isDataQueuedForProcessing(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index 7f1173e486..c636f921c3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -20,11 +20,16 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileGate; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; @@ -39,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * NiFi. */ public class LocalPort extends AbstractPort { + private static final Logger logger = LoggerFactory.getLogger(LocalPort.class); // "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure // the number of concurrent tasks to schedule for local ports and funnels. @@ -51,7 +57,7 @@ public class LocalPort extends AbstractPort { private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); - final int maxIterations; + private final int maxIterations; public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) { super(id, name, type, scheduler); @@ -61,8 +67,12 @@ public class LocalPort extends AbstractPort { int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000")); maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0)); + setYieldPeriod(nifiProperties.getBoredYieldDuration()); } + protected int getMaxIterations() { + return maxIterations; + } private boolean[] validateConnections() { // LocalPort requires both in/out. @@ -109,31 +119,107 @@ public class LocalPort extends AbstractPort { public void onTrigger(final ProcessContext context, final ProcessSession session) { readLock.lock(); try { - Set available = context.getAvailableRelationships(); - int iterations = 0; - while (!available.isEmpty()) { - final List flowFiles = session.get(1000); - if (flowFiles.isEmpty()) { - break; - } - - session.transfer(flowFiles, Relationship.ANONYMOUS); - session.commit(); - - // If there are fewer than 1,000 FlowFiles available to transfer, or if we - // have hit the configured FlowFile cap, we want to stop. This prevents us from - // holding the Timer-Driven Thread for an excessive amount of time. - if (flowFiles.size() < 1000 || ++iterations >= maxIterations) { - break; - } - - available = context.getAvailableRelationships(); + if (getConnectableType() == ConnectableType.OUTPUT_PORT) { + triggerOutputPort(context, session); + } else { + triggerInputPort(context, session); } } finally { readLock.unlock(); } } + private void triggerOutputPort(final ProcessContext context, final ProcessSession session) { + final boolean shouldTransfer = isTransferDataOut(); + if (shouldTransfer) { + transferUnboundedConcurrency(context, session); + } else { + context.yield(); + } + } + + private void triggerInputPort(final ProcessContext context, final ProcessSession session) { + final FlowFileGate flowFileGate = getProcessGroup().getFlowFileGate(); + final boolean obtainedClaim = flowFileGate.tryClaim(); + if (!obtainedClaim) { + logger.trace("{} failed to obtain claim for FlowFileGate. Will yield and will not transfer any FlowFiles", this); + context.yield(); + return; + } + + try { + logger.trace("{} obtained claim for FlowFileGate", this); + + final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency(); + switch (flowFileConcurrency) { + case UNBOUNDED: + transferUnboundedConcurrency(context, session); + break; + case SINGLE_FLOWFILE_PER_NODE: + transferSingleFlowFile(session); + break; + } + } finally { + flowFileGate.releaseClaim(); + logger.trace("{} released claim for FlowFileGate", this); + } + } + + private boolean isTransferDataOut() { + final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency(); + if (flowFileConcurrency == FlowFileConcurrency.UNBOUNDED) { + logger.trace("{} will transfer data out of Process Group because FlowFile Concurrency is Unbounded", this); + return true; + } + + final FlowFileOutboundPolicy outboundPolicy = getProcessGroup().getFlowFileOutboundPolicy(); + if (outboundPolicy == FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE) { + logger.trace("{} will transfer data out of Process Group because FlowFile Outbound Policy is Stream When Available", this); + return true; + } + + final boolean queuedForProcessing = getProcessGroup().isDataQueuedForProcessing(); + if (queuedForProcessing) { + logger.trace("{} will not transfer data out of Process Group because FlowFile Outbound Policy is Batch Output and there is data queued for Processing", this); + return false; + } + + logger.trace("{} will transfer data out of Process Group because there is no data queued for processing", this); + return true; + } + + private void transferSingleFlowFile(final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + session.transfer(flowFile, Relationship.ANONYMOUS); + } + + protected void transferUnboundedConcurrency(final ProcessContext context, final ProcessSession session) { + Set available = context.getAvailableRelationships(); + int iterations = 0; + while (!available.isEmpty()) { + final List flowFiles = session.get(1000); + if (flowFiles.isEmpty()) { + break; + } + + session.transfer(flowFiles, Relationship.ANONYMOUS); + session.commit(); + + // If there are fewer than 1,000 FlowFiles available to transfer, or if we + // have hit the configured FlowFile cap, we want to stop. This prevents us from + // holding the Timer-Driven Thread for an excessive amount of time. + if (flowFiles.size() < 1000 || ++iterations >= maxIterations) { + break; + } + + available = context.getAvailableRelationships(); + } + } + @Override public void updateConnection(final Connection connection) throws IllegalStateException { writeLock.lock(); @@ -223,4 +309,9 @@ public class LocalPort extends AbstractPort { public String getComponentType() { return "Local Port"; } + + @Override + public String toString() { + return "LocalPort[id=" + getIdentifier() + ", type=" + getConnectableType() + "]"; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java index a65583f8e5..85e598485e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java @@ -31,6 +31,8 @@ import org.apache.nifi.controller.queue.FlowFileQueue; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; @@ -454,6 +456,17 @@ public class StandardFlowSnippet implements FlowSnippet { childGroup.setPosition(toPosition(groupDTO.getPosition())); childGroup.setComments(groupDTO.getComments()); childGroup.setName(groupDTO.getName()); + + final String flowfileConcurrentName = groupDTO.getFlowfileConcurrency(); + if (flowfileConcurrentName != null) { + childGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrentName)); + } + + final String outboundPolicyName = groupDTO.getFlowfileOutboundPolicy(); + if (outboundPolicyName != null) { + childGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(outboundPolicyName)); + } + if (groupDTO.getVariables() != null) { childGroup.setVariables(groupDTO.getVariables()); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java index c9460f8d48..7bb5052696 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java @@ -57,6 +57,8 @@ import org.apache.nifi.controller.service.ControllerServiceState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.events.BulletinFactory; import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; @@ -1156,6 +1158,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { final String name = dto.getName(); final PositionDTO position = dto.getPosition(); final String comments = dto.getComments(); + final String flowfileConcurrencyName = dto.getFlowfileConcurrency(); + final String flowfileOutboundPolicyName = dto.getFlowfileOutboundPolicy(); if (name != null) { group.setName(name); @@ -1167,6 +1171,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { group.setComments(comments); } + if (flowfileConcurrencyName == null) { + group.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED); + } else { + group.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName)); + } + + if (flowfileOutboundPolicyName == null) { + group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE); + } else { + group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName)); + } + final ParameterContextReferenceEntity parameterContextReference = dto.getParameterContext(); if (parameterContextReference != null && parameterContextReference.getId() != null) { final String parameterContextId = parameterContextReference.getId(); @@ -1274,6 +1290,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer { parentGroup.addProcessGroup(processGroup); } + final String flowfileConcurrencyName = processGroupDTO.getFlowfileConcurrency(); + final String flowfileOutboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy(); + if (flowfileConcurrencyName == null) { + processGroup.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED); + } else { + processGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName)); + } + + if (flowfileOutboundPolicyName == null) { + processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE); + } else { + processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName)); + } + + final String parameterContextId = getString(processGroupElement, "parameterContextId"); if (parameterContextId != null) { final ParameterContext parameterContext = controller.getFlowManager().getParameterContextManager().getParameterContext(parameterContextId); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java index e64e573bfd..84d8cf4a71 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/FlowFromDOMFactory.java @@ -183,6 +183,8 @@ public class FlowFromDOMFactory { dto.setName(getString(element, "name")); dto.setPosition(getPosition(DomUtils.getChild(element, "position"))); dto.setComments(getString(element, "comment")); + dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency")); + dto.setFlowfileOutboundPolicy(getString(element, "flowfileOutboundPolicy")); final Map variables = new HashMap<>(); final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable"); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java index a9b203e9ac..920ccf5a3a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java @@ -234,6 +234,8 @@ public class StandardFlowSerializer implements FlowSerializer { addTextElement(element, "name", group.getName()); addPosition(element, group.getPosition()); addTextElement(element, "comment", group.getComments()); + addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name()); + addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name()); final VersionControlInformation versionControlInfo = group.getVersionControlInformation(); if (versionControlInfo != null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java index c3c95e60cf..7a11c51b5d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java @@ -350,6 +350,8 @@ public class FingerprintFactory { appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId")); appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency")); + appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy")); final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation"); if (versionControlInfo == null) { diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java new file mode 100644 index 0000000000..da1041afcf --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; + +public class SingleConcurrencyFlowFileGate implements FlowFileGate { + private final BooleanSupplier groupEmptyCheck; + private final AtomicBoolean claimed = new AtomicBoolean(false); + + public SingleConcurrencyFlowFileGate(final BooleanSupplier groupEmptyCheck) { + this.groupEmptyCheck = groupEmptyCheck; + } + + @Override + public boolean tryClaim() { + // Check if the claim is already held and atomically set it to being held. + final boolean alreadyClaimed = claimed.getAndSet(true); + if (alreadyClaimed) { + // If claim was already held, then this thread failed to obtain the claim. + return false; + } + + // The claim is now held by this thread. Check if the ProcessGroup is empty. + final boolean empty = groupEmptyCheck.getAsBoolean(); + if (empty) { + // Process Group is empty so return true indicating that the claim is now held. + return true; + } + + // Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false, + // indicating that the caller did not obtain the claim. + claimed.set(false); + return false; + } + + @Override + public void releaseClaim() { + claimed.set(false); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 0c091d8f66..da535275a8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -196,6 +196,10 @@ public final class StandardProcessGroup implements ProcessGroup { private final VersionControlFields versionControlFields = new VersionControlFields(); private volatile ParameterContext parameterContext; + private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED; + private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate(); + private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE; + private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); private final Lock writeLock = rwLock.writeLock(); @@ -5280,4 +5284,93 @@ public final class StandardProcessGroup implements ProcessGroup { } } } + + @Override + public FlowFileGate getFlowFileGate() { + return flowFileGate; + } + + @Override + public FlowFileConcurrency getFlowFileConcurrency() { + readLock.lock(); + try { + return flowFileConcurrency; + } finally { + readLock.unlock(); + } + } + + @Override + public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) { + writeLock.lock(); + try { + if (this.flowFileConcurrency == flowFileConcurrency) { + return; + } + + this.flowFileConcurrency = flowFileConcurrency; + switch (flowFileConcurrency) { + case UNBOUNDED: + flowFileGate = new UnboundedFlowFileGate(); + break; + case SINGLE_FLOWFILE_PER_NODE: + flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued()); + break; + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean isDataQueued() { + return isDataQueued(connection -> true); + } + + @Override + public boolean isDataQueuedForProcessing() { + // Data is queued for processing if a connection has data queued and the connection's destination is NOT an Output Port. + return isDataQueued(connection -> connection.getDestination().getConnectableType() != ConnectableType.OUTPUT_PORT); + } + + private boolean isDataQueued(final Predicate connectionFilter) { + readLock.lock(); + try { + for (final Connection connection : this.connections.values()) { + // If the connection doesn't pass the filter, just skip over it. + if (!connectionFilter.test(connection)) { + continue; + } + + final boolean queueEmpty = connection.getFlowFileQueue().isEmpty(); + if (!queueEmpty) { + return true; + } + } + + for (final ProcessGroup child : this.processGroups.values()) { + // Check if the child Process Group has any data enqueued. Note that we call #isDataQueued here and NOT + // #isDataQueeudForProcesing. I.e., regardless of whether this is called from #isDataQueued or #isDataQueuedForProcessing, + // for child groups, we only call #isDataQueued. This is because if data is queued up for the Output Port of a child group, + // it is still considered to be data that is being processed by this Process Group. + if (child.isDataQueued()) { + return true; + } + } + + return false; + } finally { + readLock.unlock(); + } + } + + @Override + public FlowFileOutboundPolicy getFlowFileOutboundPolicy() { + return flowFileOutboundPolicy; + } + + @Override + public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) { + this.flowFileOutboundPolicy = flowFileOutboundPolicy; + } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java new file mode 100644 index 0000000000..352053beb1 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/UnboundedFlowFileGate.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +public class UnboundedFlowFileGate implements FlowFileGate { + @Override + public boolean tryClaim() { + return true; + } + + @Override + public void releaseClaim() { + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd index ca43ebce19..36d7d92d9a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/resources/FlowConfiguration.xsd @@ -178,6 +178,8 @@ + + @@ -203,6 +205,21 @@ + + + + + + + + + + + + + + + @@ -216,8 +233,7 @@ + - RootProcessGroupType doesn't have versionControlInformation --> @@ -225,6 +241,8 @@ + + diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java index 25b8929cf8..aa6ffe4867 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/connectable/TestLocalPort.java @@ -37,14 +37,14 @@ public class TestLocalPort { public void testDefaultValues() { LocalPort port = getLocalInputPort(); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(10, port.maxIterations); + assertEquals(10, port.getMaxIterations()); } @Test public void testSetConcurrentTasks() { LocalPort port = getLocalInputPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2"); assertEquals(2, port.getMaxConcurrentTasks()); - assertEquals(10, port.maxIterations); + assertEquals(10, port.getMaxIterations()); } @Test @@ -52,27 +52,27 @@ public class TestLocalPort { { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(100, port.maxIterations); + assertEquals(100, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(101, port.maxIterations); + assertEquals(101, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(100, port.maxIterations); + assertEquals(100, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(1, port.maxIterations); + assertEquals(1, port.getMaxIterations()); } { LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1"); assertEquals(1, port.getMaxConcurrentTasks()); - assertEquals(1, port.maxIterations); + assertEquals(1, port.getMaxIterations()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index 2955838fa0..c5f05bc267 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -32,6 +32,9 @@ import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileGate; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; @@ -719,6 +722,48 @@ public class MockProcessGroup implements ProcessGroup { public void onParameterContextUpdated(final Map updatedParameters) { } + @Override + public FlowFileGate getFlowFileGate() { + return new FlowFileGate() { + @Override + public boolean tryClaim() { + return true; + } + + @Override + public void releaseClaim() { + } + }; + } + + @Override + public FlowFileConcurrency getFlowFileConcurrency() { + return FlowFileConcurrency.UNBOUNDED; + } + + @Override + public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) { + } + + @Override + public FlowFileOutboundPolicy getFlowFileOutboundPolicy() { + return FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE; + } + + @Override + public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy outboundPolicy) { + } + + @Override + public boolean isDataQueued() { + return false; + } + + @Override + public boolean isDataQueuedForProcessing() { + return false; + } + @Override public void terminateProcessor(ProcessorNode processor) { } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java index 2557241d8e..9d70f9e63a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java @@ -2477,6 +2477,8 @@ public final class DtoFactory { dto.setName(group.getName()); dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null)); dto.setVersionControlInformation(createVersionControlInformationDto(group)); + dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name()); + dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name()); final ParameterContext parameterContext = group.getParameterContext(); if (parameterContext != null) { @@ -4284,6 +4286,8 @@ public final class DtoFactory { copy.setOutputPortCount(original.getOutputPortCount()); copy.setParentGroupId(original.getParentGroupId()); copy.setVersionedComponentId(original.getVersionedComponentId()); + copy.setFlowfileConcurrency(original.getFlowfileConcurrency()); + copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy()); copy.setRunningCount(original.getRunningCount()); copy.setStoppedCount(original.getStoppedCount()); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java index b5f5bcfae2..ba2c6c232b 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/dao/impl/StandardProcessGroupDAO.java @@ -25,6 +25,8 @@ import org.apache.nifi.controller.ScheduledState; import org.apache.nifi.controller.flow.FlowManager; import org.apache.nifi.controller.service.ControllerServiceNode; import org.apache.nifi.controller.service.ControllerServiceState; +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.parameter.ParameterContext; @@ -335,6 +337,11 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou final String name = processGroupDTO.getName(); final String comments = processGroupDTO.getComments(); + final String concurrencyName = processGroupDTO.getFlowfileConcurrency(); + final FlowFileConcurrency flowFileConcurrency = concurrencyName == null ? null : FlowFileConcurrency.valueOf(concurrencyName); + + final String outboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy(); + final FlowFileOutboundPolicy flowFileOutboundPolicy = outboundPolicyName == null ? null : FlowFileOutboundPolicy.valueOf(outboundPolicyName); final ParameterContextReferenceEntity parameterContextReference = processGroupDTO.getParameterContext(); if (parameterContextReference != null) { @@ -364,7 +371,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou if (isNotNull(comments)) { group.setComments(comments); } - + if (flowFileConcurrency != null) { + group.setFlowFileConcurrency(flowFileConcurrency); + } + if (flowFileOutboundPolicy != null) { + group.setFlowFileOutboundPolicy(flowFileOutboundPolicy); + } group.onComponentModified(); return group; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp index b21ba0c102..eb35513796 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/process-group-configuration.jsp @@ -54,6 +54,24 @@ +
+
Process group FlowFile concurrency
+
+
+
+
+ +
+
+
+
Process group outbound policy
+
+
+
+
+ +
+
Apply
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css index ac968b4311..b6322f6195 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/process-group-configuration.css @@ -85,6 +85,14 @@ width: 328px; } +#process-group-flowfile-concurrency-combo { + width: 328px; +} + +#process-group-outbound-policy-combo { + width: 328px; +} + #process-group-comments { height: 100px; } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js index c4dbfd8c4f..79af2aba0c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-process-group-configuration.js @@ -105,7 +105,9 @@ 'comments': $('#process-group-comments').val(), 'parameterContext': { 'id': $('#process-group-parameter-context-combo').combo('getSelectedOption').value - } + }, + 'flowfileConcurrency': $('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value, + 'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value } }; @@ -212,6 +214,41 @@ // populate the process group settings $('#process-group-name').removeClass('unset').val(processGroup.name); $('#process-group-comments').removeClass('unset').val(processGroup.comments); + $('#process-group-flowfile-concurrency-combo').removeClass('unset').combo({ + options: [{ + text: 'Single FlowFile Per Node', + value: 'SINGLE_FLOWFILE_PER_NODE', + description: 'Only a single FlowFile is to be allowed to enter the Process Group at a time on each node in the cluster. While that FlowFile may be split into many or ' + + 'spawn many children, no additional FlowFiles will be allowed to enter the Process Group through a Local Input Port until the previous FlowFile ' + + '- and all of its child/descendent FlowFiles - have been processed.' + }, { + text: 'Unbounded', + value: 'UNBOUNDED', + description: 'The number of FlowFiles that can be processed concurrently is unbounded.' + }], + selectedOption: { + value: processGroup.flowfileConcurrency + } + }); + + $('#process-group-outbound-policy-combo').removeClass('unset').combo({ + options: [{ + text: 'Stream When Available', + value: 'STREAM_WHEN_AVAILABLE', + description: 'FlowFiles that are queued up to be transferred out of a ProcessGroup by an Output Port will be transferred out ' + + 'of the Process Group as soon as they are available.' + }, { + text: 'Batch Output', + value: 'BATCH_OUTPUT', + description: 'FlowFiles that are queued up to be transferred out of a Process Group by an Output Port will remain queued until ' + + 'all FlowFiles in the Process Group are ready to be transferred out of the group. The FlowFiles will then be transferred ' + + 'out of the group. This setting will be ignored if the FlowFile Concurrency is Unbounded.' + }], + selectedOption: { + value: processGroup.flowfileOutboundPolicy + } + }); + // populate the header $('#process-group-configuration-header-text').text(processGroup.name + ' Configuration'); @@ -228,6 +265,12 @@ $('#read-only-process-group-name').text(processGroup.name); $('#read-only-process-group-comments').text(processGroup.comments); + var concurrencyName = processGroup.flowfileConcurrency == "UNBOUNDED" ? "Unbounded" : "Single FlowFile Per Node"; + $('#read-only-process-group-flowfile-concurrency').text(concurrencyName); + + var outboundPolicyName = processGroup.flowfileOutboundPolicy == "BATCH_OUTPUT" ? "Batch Output" : "Stream When Available"; + $('#read-only-process-group-outbound-policy').text(outboundPolicyName); + // populate the header $('#process-group-configuration-header-text').text(processGroup.name + ' Configuration'); } else { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index a9806a0c3c..3c5d4f960e 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -17,6 +17,7 @@ package org.apache.nifi.tests.system; import org.apache.nifi.cluster.coordination.node.NodeConnectionState; +import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.queue.LoadBalanceCompression; import org.apache.nifi.controller.queue.LoadBalanceStrategy; import org.apache.nifi.controller.queue.QueueSize; @@ -380,12 +381,21 @@ public class NiFiClientUtil { return counterValues; } + public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException { + final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity(); + scheduleComponentsEntity.setId(groupId); + scheduleComponentsEntity.setState("RUNNING"); + final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents("root", scheduleComponentsEntity); + + return scheduleEntity; + } + public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException { final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity(); - scheduleComponentsEntity.setId("root"); + scheduleComponentsEntity.setId(groupId); scheduleComponentsEntity.setState("STOPPED"); final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents("root", scheduleComponentsEntity); - waitForProcessorsStopped("root"); + waitForProcessorsStopped(groupId); return scheduleEntity; } @@ -536,6 +546,18 @@ public class NiFiClientUtil { } } + public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination) throws NiFiClientException, IOException { + return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName())); + } + + public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination) throws NiFiClientException, IOException { + return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName())); + } + + public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final String relationship) throws NiFiClientException, IOException { + return createConnection(source, destination, Collections.singleton(relationship)); + } + public ConnectionEntity createConnection(final ProcessorEntity source, final ProcessorEntity destination, final String relationship) throws NiFiClientException, IOException { return createConnection(source, destination, Collections.singleton(relationship)); } @@ -548,27 +570,41 @@ public class NiFiClientUtil { return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); } + public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final Set relationships) throws NiFiClientException, IOException { + return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); + } + + public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination, final Set relationships) throws NiFiClientException, IOException { + return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); + } + + public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination, final Set relationships) throws NiFiClientException, IOException { + return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships); + } + public ConnectionEntity createConnection(final ConnectableDTO source, final ConnectableDTO destination, final Set relationships) throws NiFiClientException, IOException { + final String groupId = "OUTPUT_PORT".equals(source.getType()) ? destination.getGroupId() : source.getGroupId(); + final ConnectionDTO connectionDto = new ConnectionDTO(); connectionDto.setSelectedRelationships(relationships); connectionDto.setDestination(destination); connectionDto.setSource(source); - connectionDto.setParentGroupId(source.getGroupId()); + connectionDto.setParentGroupId(groupId); final ConnectionEntity connectionEntity = new ConnectionEntity(); connectionEntity.setComponent(connectionDto); connectionEntity.setDestinationGroupId(destination.getGroupId()); connectionEntity.setDestinationId(destination.getId()); - connectionEntity.setDestinationType("PROCESSOR"); + connectionEntity.setDestinationType(destination.getType()); connectionEntity.setSourceGroupId(source.getGroupId()); connectionEntity.setSourceId(source.getId()); - connectionEntity.setDestinationType("PROCESSOR"); + connectionEntity.setSourceType(source.getType()); connectionEntity.setRevision(createNewRevision()); - return nifiClient.getConnectionClient().createConnection(source.getGroupId(), connectionEntity); + return nifiClient.getConnectionClient().createConnection(groupId, connectionEntity); } public ConnectableDTO createConnectableDTO(final ProcessorEntity processor) { @@ -778,6 +814,30 @@ public class NiFiClientUtil { return childGroup; } + public PortEntity createInputPort(final String name, final String groupId) throws NiFiClientException, IOException { + final PortDTO component = new PortDTO(); + component.setName(name); + component.setParentGroupId(groupId); + + final PortEntity inputPortEntity = new PortEntity(); + inputPortEntity.setRevision(createNewRevision()); + inputPortEntity.setComponent(component); + + return nifiClient.getInputPortClient().createInputPort(groupId, inputPortEntity); + } + + public PortEntity createOutputPort(final String name, final String groupId) throws NiFiClientException, IOException { + final PortDTO component = new PortDTO(); + component.setName(name); + component.setParentGroupId(groupId); + + final PortEntity outputPortEntity = new PortEntity(); + outputPortEntity.setRevision(createNewRevision()); + outputPortEntity.setComponent(component); + + return nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity); + } + public ProvenanceEntity queryProvenance(final Map searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException { final Map searchTermsAsStrings = searchTerms.entrySet().stream() .collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue)); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index 1e37b018d9..a57d4ea5b5 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -30,18 +30,26 @@ import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BooleanSupplier; import java.util.regex.Matcher; import java.util.regex.Pattern; public abstract class NiFiSystemIT { + private static final Logger logger = LoggerFactory.getLogger(NiFiSystemIT.class); + private final ConcurrentMap lastLogTimestamps = new ConcurrentHashMap<>(); + public static final int CLIENT_API_PORT = 5671; public static final String NIFI_GROUP_ID = "org.apache.nifi"; public static final String TEST_EXTENSIONS_ARTIFACT_ID = "nifi-system-test-extensions-nar"; @@ -131,6 +139,8 @@ public abstract class NiFiSystemIT { } protected void waitForAllNodesConnected(final int expectedNumberOfNodes, final long sleepMillis) { + logger.info("Waiting for {} nodes to connect", expectedNumberOfNodes); + final NiFiClient client = getNifiClient(); final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60); @@ -142,6 +152,8 @@ public abstract class NiFiSystemIT { return; } + logEverySecond("Waiting for {} nodes to connect but currently on {} nodes are connected", expectedNumberOfNodes, connectedNodeCount); + if (System.currentTimeMillis() > maxTime) { throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); } @@ -262,10 +274,27 @@ public abstract class NiFiSystemIT { } protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException { + logger.info("Waiting for Queue Count of {} on Connection {}", queueSize, connectionId); + waitFor(() -> { final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId); - return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == queueSize; + final int currentSize = statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued(); + final String sourceName = statusEntity.getConnectionStatus().getSourceName(); + final String destinationName = statusEntity.getConnectionStatus().getDestinationName(); + logEverySecond("Current Queue Size for Connection from {} to {} = {}, Waiting for {}", sourceName, destinationName, currentSize, queueSize); + + return currentSize == queueSize; }); + + logger.info("Queue Count for Connection {} is now {}", connectionId, queueSize); + } + + private void logEverySecond(final String message, final Object... args) { + final Long lastLogTime = lastLogTimestamps.get(message); + if (lastLogTime == null || lastLogTime < System.currentTimeMillis() - 1000L) { + logger.info(message, args); + lastLogTimestamps.put(message, System.currentTimeMillis()); + } } private ConnectionStatusEntity getConnectionStatus(final String connectionId) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java new file mode 100644 index 0000000000..02e8bbc787 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.pg; + +import org.apache.nifi.groups.FlowFileConcurrency; +import org.apache.nifi.groups.FlowFileOutboundPolicy; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.PortEntity; +import org.apache.nifi.web.api.entity.ProcessGroupEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { + + + @Test + public void testSingleConcurrency() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); + final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); + final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("Batch Size", "3")); + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + + final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupEntity.getId()); + getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "5 sec")); + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Connect Generate -> Input Port -> Sleep -> Output Port -> Terminate + // Since we will use Single FlowFile at a time concurrency, we should see that the connection between Input Port and Sleep + // never has more than 1 FlowFile even though the Sleep processor takes a long time. + final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success"); + final ConnectionEntity inputToSleep = getClientUtil().createConnection(inputPort, sleep); + getClientUtil().createConnection(sleep, outputPort, "success"); + final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate); + + processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name()); + getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity); + + // Start all components except for Terminate. We want the data to queue up before terminate so we can ensure that the + // correct number of FlowFiles are queued up. + getClientUtil().startProcessGroupComponents("root"); + getNifiClient().getProcessorClient().stopProcessor(terminate); + + // Wait for 1 FlowFile to queue up for the Sleep Processor. This should leave 2 FlowFiles queued up for the input port. + waitForQueueCount(inputToSleep.getId(), 1); + assertEquals(2, getConnectionQueueSize(generateToInput.getId())); + + // Wait until only 1 FlowFile is queued up for the input port. Because Sleep should take 5 seconds to complete its job, + // It should take 5 seconds for this to happen. But it won't be exact. So we'll ensure that it takes at least 3 seconds. We could + // put an upper bound such as 6 or 7 seconds as well, but it's a good idea to avoid that because the tests may run in some environments + // with constrained resources that may take a lot longer to run. + final long startTime = System.currentTimeMillis(); + waitForQueueCount(generateToInput.getId(), 1); + final long endTime = System.currentTimeMillis(); + final long delay = endTime - startTime; + assertTrue(delay > 3000L); + + assertEquals(1, getConnectionQueueSize(inputToSleep.getId())); + + waitForQueueCount(outputToTerminate.getId(), 2); + + // Wait until all FlowFiles have been ingested. + waitForQueueCount(generateToInput.getId(), 0); + assertEquals(1, getConnectionQueueSize(inputToSleep.getId())); + + // Ensure that 3 FlowFiles are queued up for Terminate + waitForQueueCount(outputToTerminate.getId(), 3); + } + + + @Test + public void testSingleConcurrencyAndBatchOutput() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); + final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); + final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); + final PortEntity secondOut = getClientUtil().createOutputPort("Out2", processGroupEntity.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + + final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupEntity.getId()); // sleep with default configuration is just a simple pass-through + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Connect Generate -> Input Port -> Count -> Output Port -> Terminate + // Also connect InputPort -> Out2 -> Terminate + final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success"); + final ConnectionEntity inputToSleep = getClientUtil().createConnection(inputPort, sleep); + final ConnectionEntity sleepToOutput = getClientUtil().createConnection(sleep, outputPort, "success"); + final ConnectionEntity inputToSecondOut = getClientUtil().createConnection(inputPort, secondOut); + final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate); + final ConnectionEntity secondOutToTerminate = getClientUtil().createConnection(secondOut, terminate); + + processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name()); + processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name()); + getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity); + + // Start generate so that data is created. Start Input Port so that the data is ingested. + // Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor, + // and that should prevent data from being transferred by Output Port "Out2" also. + getNifiClient().getProcessorClient().startProcessor(generate); + getNifiClient().getInputPortClient().startInputPort(inputPort); + getNifiClient().getOutputPortClient().startOutputPort(outputPort); + getNifiClient().getOutputPortClient().startOutputPort(secondOut); + + waitForQueueCount(inputToSleep.getId(), 1); + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + + // Wait 3 seconds to ensure that data is never transferred + for (int i=0; i < 3; i++) { + Thread.sleep(1000L); + assertEquals(1, getConnectionQueueSize(inputToSleep.getId())); + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + } + + // Start Sleep + getNifiClient().getProcessorClient().startProcessor(sleep); + + // Data should now flow from both output ports. + waitForQueueCount(inputToSleep.getId(), 0); + waitForQueueCount(inputToSecondOut.getId(), 0); + + assertEquals(1, getConnectionQueueSize(outputToTerminate.getId())); + assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId())); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java index b635404e2c..1bfab8778a 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/OutputPortClient.java @@ -30,7 +30,13 @@ public interface OutputPortClient { PortEntity deleteOutputPort(PortEntity entity) throws NiFiClientException, IOException; + /** + * @deprecated use startOutputPort + */ + @Deprecated PortEntity startInpuOutputPort(PortEntity entity) throws NiFiClientException, IOException; + PortEntity startOutputPort(PortEntity entity) throws NiFiClientException, IOException; + PortEntity stopOutputPort(PortEntity entity) throws NiFiClientException, IOException; } diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java index 46637fcf56..645c95f9a2 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyOutputPortClient.java @@ -62,6 +62,11 @@ public class JerseyOutputPortClient extends CRUDJerseyClient impleme @Override public PortEntity startInpuOutputPort(final PortEntity entity) throws NiFiClientException, IOException { + return startOutputPort(entity); + } + + @Override + public PortEntity startOutputPort(final PortEntity entity) throws NiFiClientException, IOException { final PortEntity startEntity = createStateEntity(entity, "RUNNING"); return updateOutputPort(startEntity); }