NIFI-7476: Implemented FlowFileGating / FlowFileConcurrency at the ProcessGroup level

Added FlowFileOutboundPolicy to ProcessGroups and updated LocalPort to make use of it
Persisted FlowFile Concurrency and FlowFile Output Policy to flow.xml.gz and included in flow fingerprint
Added configuration for FlowFile concurrency and outbound policy to UI for configuration of Process Groups
Added system tests. Fixed a couple of bugs that were found
Fixed a couple of typos in the RecordPath guide

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4306.
This commit is contained in:
Mark Payne 2020-05-20 17:09:40 -04:00 committed by Pierre Villard
parent 915617dbe7
commit 359fd3ff29
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
27 changed files with 921 additions and 38 deletions

View File

@ -34,6 +34,8 @@ public class ProcessGroupDTO extends ComponentDTO {
private Map<String, String> variables;
private VersionControlInformationDTO versionControlInformation;
private ParameterContextReferenceEntity parameterContext;
private String flowfileConcurrency;
private String flowfileOutboundPolicy;
private Integer runningCount;
private Integer stoppedCount;
@ -352,4 +354,23 @@ public class ProcessGroupDTO extends ComponentDTO {
public void setParameterContext(final ParameterContextReferenceEntity parameterContext) {
this.parameterContext = parameterContext;
}
@ApiModelProperty(value = "The FlowFile Concurrency for this Process Group.", allowableValues = "UNBOUNDED, SINGLE_FLOWFILE_PER_NODE")
public String getFlowfileConcurrency() {
return flowfileConcurrency;
}
public void setFlowfileConcurrency(final String flowfileConcurrency) {
this.flowfileConcurrency = flowfileConcurrency;
}
@ApiModelProperty(value = "The Oubound Policy that is used for determining how FlowFiles should be transferred out of the Process Group.",
allowableValues = "STREAM_WHEN_AVAILABLE, BATCH_OUTPUT")
public String getFlowfileOutboundPolicy() {
return flowfileOutboundPolicy;
}
public void setFlowfileOutboundPolicy(final String flowfileOutboundPolicy) {
this.flowfileOutboundPolicy = flowfileOutboundPolicy;
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
/**
* Specifies the concurrency level of a Process Group
*/
public enum FlowFileConcurrency {
/**
* Only a single FlowFile is to be allowed to enter the Process Group at a time.
* While that FlowFile may be split into many or spawn many children, no additional FlowFiles will be
* allowed to enter the Process Group through a Local Input Port until the previous FlowFile - and all of its
* child/descendent FlowFiles - have been processed. In a clustered instance, each node may allow through
* a single FlowFile at a time, so multiple FlowFiles may still be processed concurrently across the cluster.
*/
SINGLE_FLOWFILE_PER_NODE,
/**
* The number of FlowFiles that can be processed concurrently is unbounded.
*/
UNBOUNDED;
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
public interface FlowFileGate {
boolean tryClaim();
void releaseClaim();
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
public enum FlowFileOutboundPolicy {
/**
* FlowFiles that are queued up to be transferred out of a ProcessGroup by an Output Port will be transferred
* out of the Process Group as soon as they are available.
*/
STREAM_WHEN_AVAILABLE,
/**
* FlowFiles that are queued up to be transferred out of a Process Group by an Output Port will remain queued until
* all FlowFiles in the Process Group are ready to be transferred out of the group. The FlowFiles will then be transferred
* out of the group. I.e., the FlowFiles will be batched together and transferred at the same time (not necessarily in a single
* Process Session) but no FlowFile will be transferred until all FlowFiles in the group are ready to be transferred.
*/
BATCH_OUTPUT;
}

View File

@ -1062,4 +1062,46 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param updatedParameters a Map of parameter name to the ParameterUpdate that describes how the Parameter was updated
*/
void onParameterContextUpdated(Map<String, ParameterUpdate> updatedParameters);
/**
* @return the FlowFileGate that must be used for obtaining a claim before an InputPort is allowed to bring data into a ProcessGroup
*/
FlowFileGate getFlowFileGate();
/**
* @return the FlowFileConcurrency that is currently configured for the ProcessGroup
*/
FlowFileConcurrency getFlowFileConcurrency();
/**
* Sets the FlowFileConcurrency to use for this ProcessGroup
* @param flowFileConcurrency the FlowFileConcurrency to use
*/
void setFlowFileConcurrency(FlowFileConcurrency flowFileConcurrency);
/**
* @return the FlowFile Outbound Policy that governs the behavior of this Process Group
*/
FlowFileOutboundPolicy getFlowFileOutboundPolicy();
/**
* Specifies the FlowFile Outbound Policy that should be applied to this Process Group
* @param outboundPolicy the policy to enforce.
*/
void setFlowFileOutboundPolicy(FlowFileOutboundPolicy outboundPolicy);
/**
* @return true if at least one FlowFile resides in a FlowFileQueue in this Process Group or a child ProcessGroup, false otherwise
*/
boolean isDataQueued();
/**
* Indicates whether or not data is queued for Processing. Data is considered queued for processing if it is enqueued in a Connection and
* the destination of that Connection is not an Output Port, OR if the data is enqueued within a child group, regardless of whether or not it is
* queued before an Output Port. I.e., any data that is enqueued in this Process Group is enqueued for Processing unless it is ready to be transferred
* out of this Process Group.
*
* @return <code>true</code> if there is data that is queued for Processing, <code>false</code> otherwise
*/
boolean isDataQueuedForProcessing();
}

View File

@ -20,11 +20,16 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileGate;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
@ -39,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* NiFi.
*/
public class LocalPort extends AbstractPort {
private static final Logger logger = LoggerFactory.getLogger(LocalPort.class);
// "_nifi.funnel.max.concurrent.tasks" is an experimental NiFi property allowing users to configure
// the number of concurrent tasks to schedule for local ports and funnels.
@ -51,7 +57,7 @@ public class LocalPort extends AbstractPort {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
final int maxIterations;
private final int maxIterations;
public LocalPort(final String id, final String name, final ConnectableType type, final ProcessScheduler scheduler, final NiFiProperties nifiProperties) {
super(id, name, type, scheduler);
@ -61,8 +67,12 @@ public class LocalPort extends AbstractPort {
int maxTransferredFlowFiles = Integer.parseInt(nifiProperties.getProperty(MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "10000"));
maxIterations = Math.max(1, (int) Math.ceil(maxTransferredFlowFiles / 1000.0));
setYieldPeriod(nifiProperties.getBoredYieldDuration());
}
protected int getMaxIterations() {
return maxIterations;
}
private boolean[] validateConnections() {
// LocalPort requires both in/out.
@ -109,31 +119,107 @@ public class LocalPort extends AbstractPort {
public void onTrigger(final ProcessContext context, final ProcessSession session) {
readLock.lock();
try {
Set<Relationship> available = context.getAvailableRelationships();
int iterations = 0;
while (!available.isEmpty()) {
final List<FlowFile> flowFiles = session.get(1000);
if (flowFiles.isEmpty()) {
break;
}
session.transfer(flowFiles, Relationship.ANONYMOUS);
session.commit();
// If there are fewer than 1,000 FlowFiles available to transfer, or if we
// have hit the configured FlowFile cap, we want to stop. This prevents us from
// holding the Timer-Driven Thread for an excessive amount of time.
if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
break;
}
available = context.getAvailableRelationships();
if (getConnectableType() == ConnectableType.OUTPUT_PORT) {
triggerOutputPort(context, session);
} else {
triggerInputPort(context, session);
}
} finally {
readLock.unlock();
}
}
private void triggerOutputPort(final ProcessContext context, final ProcessSession session) {
final boolean shouldTransfer = isTransferDataOut();
if (shouldTransfer) {
transferUnboundedConcurrency(context, session);
} else {
context.yield();
}
}
private void triggerInputPort(final ProcessContext context, final ProcessSession session) {
final FlowFileGate flowFileGate = getProcessGroup().getFlowFileGate();
final boolean obtainedClaim = flowFileGate.tryClaim();
if (!obtainedClaim) {
logger.trace("{} failed to obtain claim for FlowFileGate. Will yield and will not transfer any FlowFiles", this);
context.yield();
return;
}
try {
logger.trace("{} obtained claim for FlowFileGate", this);
final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency();
switch (flowFileConcurrency) {
case UNBOUNDED:
transferUnboundedConcurrency(context, session);
break;
case SINGLE_FLOWFILE_PER_NODE:
transferSingleFlowFile(session);
break;
}
} finally {
flowFileGate.releaseClaim();
logger.trace("{} released claim for FlowFileGate", this);
}
}
private boolean isTransferDataOut() {
final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency();
if (flowFileConcurrency == FlowFileConcurrency.UNBOUNDED) {
logger.trace("{} will transfer data out of Process Group because FlowFile Concurrency is Unbounded", this);
return true;
}
final FlowFileOutboundPolicy outboundPolicy = getProcessGroup().getFlowFileOutboundPolicy();
if (outboundPolicy == FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE) {
logger.trace("{} will transfer data out of Process Group because FlowFile Outbound Policy is Stream When Available", this);
return true;
}
final boolean queuedForProcessing = getProcessGroup().isDataQueuedForProcessing();
if (queuedForProcessing) {
logger.trace("{} will not transfer data out of Process Group because FlowFile Outbound Policy is Batch Output and there is data queued for Processing", this);
return false;
}
logger.trace("{} will transfer data out of Process Group because there is no data queued for processing", this);
return true;
}
private void transferSingleFlowFile(final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
session.transfer(flowFile, Relationship.ANONYMOUS);
}
protected void transferUnboundedConcurrency(final ProcessContext context, final ProcessSession session) {
Set<Relationship> available = context.getAvailableRelationships();
int iterations = 0;
while (!available.isEmpty()) {
final List<FlowFile> flowFiles = session.get(1000);
if (flowFiles.isEmpty()) {
break;
}
session.transfer(flowFiles, Relationship.ANONYMOUS);
session.commit();
// If there are fewer than 1,000 FlowFiles available to transfer, or if we
// have hit the configured FlowFile cap, we want to stop. This prevents us from
// holding the Timer-Driven Thread for an excessive amount of time.
if (flowFiles.size() < 1000 || ++iterations >= maxIterations) {
break;
}
available = context.getAvailableRelationships();
}
}
@Override
public void updateConnection(final Connection connection) throws IllegalStateException {
writeLock.lock();
@ -223,4 +309,9 @@ public class LocalPort extends AbstractPort {
public String getComponentType() {
return "Local Port";
}
@Override
public String toString() {
return "LocalPort[id=" + getIdentifier() + ", type=" + getConnectableType() + "]";
}
}

View File

@ -31,6 +31,8 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@ -454,6 +456,17 @@ public class StandardFlowSnippet implements FlowSnippet {
childGroup.setPosition(toPosition(groupDTO.getPosition()));
childGroup.setComments(groupDTO.getComments());
childGroup.setName(groupDTO.getName());
final String flowfileConcurrentName = groupDTO.getFlowfileConcurrency();
if (flowfileConcurrentName != null) {
childGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrentName));
}
final String outboundPolicyName = groupDTO.getFlowfileOutboundPolicy();
if (outboundPolicyName != null) {
childGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(outboundPolicyName));
}
if (groupDTO.getVariables() != null) {
childGroup.setVariables(groupDTO.getVariables());
}

View File

@ -57,6 +57,8 @@ import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
@ -1156,6 +1158,8 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
final String name = dto.getName();
final PositionDTO position = dto.getPosition();
final String comments = dto.getComments();
final String flowfileConcurrencyName = dto.getFlowfileConcurrency();
final String flowfileOutboundPolicyName = dto.getFlowfileOutboundPolicy();
if (name != null) {
group.setName(name);
@ -1167,6 +1171,18 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
group.setComments(comments);
}
if (flowfileConcurrencyName == null) {
group.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED);
} else {
group.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName));
}
if (flowfileOutboundPolicyName == null) {
group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE);
} else {
group.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
}
final ParameterContextReferenceEntity parameterContextReference = dto.getParameterContext();
if (parameterContextReference != null && parameterContextReference.getId() != null) {
final String parameterContextId = parameterContextReference.getId();
@ -1274,6 +1290,21 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
parentGroup.addProcessGroup(processGroup);
}
final String flowfileConcurrencyName = processGroupDTO.getFlowfileConcurrency();
final String flowfileOutboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy();
if (flowfileConcurrencyName == null) {
processGroup.setFlowFileConcurrency(FlowFileConcurrency.UNBOUNDED);
} else {
processGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrencyName));
}
if (flowfileOutboundPolicyName == null) {
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE);
} else {
processGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(flowfileOutboundPolicyName));
}
final String parameterContextId = getString(processGroupElement, "parameterContextId");
if (parameterContextId != null) {
final ParameterContext parameterContext = controller.getFlowManager().getParameterContextManager().getParameterContext(parameterContextId);

View File

@ -183,6 +183,8 @@ public class FlowFromDOMFactory {
dto.setName(getString(element, "name"));
dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
dto.setComments(getString(element, "comment"));
dto.setFlowfileConcurrency(getString(element, "flowfileConcurrency"));
dto.setFlowfileOutboundPolicy(getString(element, "flowfileOutboundPolicy"));
final Map<String, String> variables = new HashMap<>();
final NodeList variableList = DomUtils.getChildNodesByTagName(element, "variable");

View File

@ -234,6 +234,8 @@ public class StandardFlowSerializer implements FlowSerializer<Document> {
addTextElement(element, "name", group.getName());
addPosition(element, group.getPosition());
addTextElement(element, "comment", group.getComments());
addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name());
addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name());
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {

View File

@ -350,6 +350,8 @@ public class FingerprintFactory {
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "versionedComponentId"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "parameterContextId"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileConcurrency"));
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "flowfileOutboundPolicy"));
final Element versionControlInfo = DomUtils.getChild(processGroupElem, "versionControlInformation");
if (versionControlInfo == null) {

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
public class SingleConcurrencyFlowFileGate implements FlowFileGate {
private final BooleanSupplier groupEmptyCheck;
private final AtomicBoolean claimed = new AtomicBoolean(false);
public SingleConcurrencyFlowFileGate(final BooleanSupplier groupEmptyCheck) {
this.groupEmptyCheck = groupEmptyCheck;
}
@Override
public boolean tryClaim() {
// Check if the claim is already held and atomically set it to being held.
final boolean alreadyClaimed = claimed.getAndSet(true);
if (alreadyClaimed) {
// If claim was already held, then this thread failed to obtain the claim.
return false;
}
// The claim is now held by this thread. Check if the ProcessGroup is empty.
final boolean empty = groupEmptyCheck.getAsBoolean();
if (empty) {
// Process Group is empty so return true indicating that the claim is now held.
return true;
}
// Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false,
// indicating that the caller did not obtain the claim.
claimed.set(false);
return false;
}
@Override
public void releaseClaim() {
claimed.set(false);
}
}

View File

@ -196,6 +196,10 @@ public final class StandardProcessGroup implements ProcessGroup {
private final VersionControlFields versionControlFields = new VersionControlFields();
private volatile ParameterContext parameterContext;
private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED;
private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate();
private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
@ -5280,4 +5284,93 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
}
@Override
public FlowFileGate getFlowFileGate() {
return flowFileGate;
}
@Override
public FlowFileConcurrency getFlowFileConcurrency() {
readLock.lock();
try {
return flowFileConcurrency;
} finally {
readLock.unlock();
}
}
@Override
public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) {
writeLock.lock();
try {
if (this.flowFileConcurrency == flowFileConcurrency) {
return;
}
this.flowFileConcurrency = flowFileConcurrency;
switch (flowFileConcurrency) {
case UNBOUNDED:
flowFileGate = new UnboundedFlowFileGate();
break;
case SINGLE_FLOWFILE_PER_NODE:
flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued());
break;
}
} finally {
writeLock.unlock();
}
}
@Override
public boolean isDataQueued() {
return isDataQueued(connection -> true);
}
@Override
public boolean isDataQueuedForProcessing() {
// Data is queued for processing if a connection has data queued and the connection's destination is NOT an Output Port.
return isDataQueued(connection -> connection.getDestination().getConnectableType() != ConnectableType.OUTPUT_PORT);
}
private boolean isDataQueued(final Predicate<Connection> connectionFilter) {
readLock.lock();
try {
for (final Connection connection : this.connections.values()) {
// If the connection doesn't pass the filter, just skip over it.
if (!connectionFilter.test(connection)) {
continue;
}
final boolean queueEmpty = connection.getFlowFileQueue().isEmpty();
if (!queueEmpty) {
return true;
}
}
for (final ProcessGroup child : this.processGroups.values()) {
// Check if the child Process Group has any data enqueued. Note that we call #isDataQueued here and NOT
// #isDataQueeudForProcesing. I.e., regardless of whether this is called from #isDataQueued or #isDataQueuedForProcessing,
// for child groups, we only call #isDataQueued. This is because if data is queued up for the Output Port of a child group,
// it is still considered to be data that is being processed by this Process Group.
if (child.isDataQueued()) {
return true;
}
}
return false;
} finally {
readLock.unlock();
}
}
@Override
public FlowFileOutboundPolicy getFlowFileOutboundPolicy() {
return flowFileOutboundPolicy;
}
@Override
public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) {
this.flowFileOutboundPolicy = flowFileOutboundPolicy;
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.groups;
public class UnboundedFlowFileGate implements FlowFileGate {
@Override
public boolean tryClaim() {
return true;
}
@Override
public void releaseClaim() {
}
}

View File

@ -178,6 +178,8 @@
<xs:element name="name" type="NonEmptyStringType" />
<xs:element name="position" type="PositionType" />
<xs:element name="comment" type="xs:string" />
<xs:element name="flowfileConcurrency" type="FlowFileConcurrencyType" minOccurs="0" maxOccurs="1" />
<xs:element name="flowfileOutboundPolicy" type="FlowFileOutboundPolicyType" minOccurs="0" maxOccurs="1" />
<xs:element name="versionControlInformation" type="VersionControlInformation" minOccurs="0" maxOccurs="1" />
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
@ -203,6 +205,21 @@
<xs:attribute name="value" />
</xs:complexType>
<xs:simpleType name="FlowFileConcurrencyType">
<xs:restriction base="xs:string">
<xs:enumeration value="SINGLE_FLOWFILE_PER_NODE" />
<xs:enumeration value="UNBOUNDED" />
</xs:restriction>
</xs:simpleType>
<xs:simpleType name="FlowFileOutboundPolicyType">
<xs:restriction base="xs:string">
<xs:enumeration value="STREAM_WHEN_AVAILABLE" />
<xs:enumeration value="BATCH_OUTPUT" />
</xs:restriction>
</xs:simpleType>
<xs:complexType name="VersionControlInformation">
<xs:sequence>
<xs:element name="registryId" type="NonEmptyStringType" />
@ -216,8 +233,7 @@
</xs:complexType>
<!-- Same as ProcessGroupType except:
- RootProcessGroupType doesn't have versionControlInformation
- RootProcessGroupType doesn't have variable -->
- RootProcessGroupType doesn't have versionControlInformation -->
<xs:complexType name="RootProcessGroupType">
<xs:sequence>
<xs:element name="id" type="NonEmptyStringType" />
@ -225,6 +241,8 @@
<xs:element name="name" type="NonEmptyStringType" />
<xs:element name="position" type="PositionType" />
<xs:element name="comment" type="xs:string" />
<xs:element name="flowfileConcurrency" type="FlowFileConcurrencyType" minOccurs="0" maxOccurs="1" />
<xs:element name="flowfileOutboundPolicy" type="FlowFileOutboundPolicyType" minOccurs="0" maxOccurs="1" />
<!-- Each "processor" defines the actual dataflow work horses that make dataflow happen-->
<xs:element name="processor" type="ProcessorType" minOccurs="0" maxOccurs="unbounded"/>

View File

@ -37,14 +37,14 @@ public class TestLocalPort {
public void testDefaultValues() {
LocalPort port = getLocalInputPort();
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(10, port.maxIterations);
assertEquals(10, port.getMaxIterations());
}
@Test
public void testSetConcurrentTasks() {
LocalPort port = getLocalInputPort(LocalPort.MAX_CONCURRENT_TASKS_PROP_NAME, "2");
assertEquals(2, port.getMaxConcurrentTasks());
assertEquals(10, port.maxIterations);
assertEquals(10, port.getMaxIterations());
}
@Test
@ -52,27 +52,27 @@ public class TestLocalPort {
{
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100000");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(100, port.maxIterations);
assertEquals(100, port.getMaxIterations());
}
{
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "100001");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(101, port.maxIterations);
assertEquals(101, port.getMaxIterations());
}
{
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "99999");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(100, port.maxIterations);
assertEquals(100, port.getMaxIterations());
}
{
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "0");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(1, port.maxIterations);
assertEquals(1, port.getMaxIterations());
}
{
LocalPort port = getLocalInputPort(LocalPort.MAX_TRANSFERRED_FLOWFILES_PROP_NAME, "1");
assertEquals(1, port.getMaxConcurrentTasks());
assertEquals(1, port.maxIterations);
assertEquals(1, port.getMaxIterations());
}
}

View File

@ -32,6 +32,9 @@ import org.apache.nifi.controller.Snippet;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileGate;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
@ -719,6 +722,48 @@ public class MockProcessGroup implements ProcessGroup {
public void onParameterContextUpdated(final Map<String, ParameterUpdate> updatedParameters) {
}
@Override
public FlowFileGate getFlowFileGate() {
return new FlowFileGate() {
@Override
public boolean tryClaim() {
return true;
}
@Override
public void releaseClaim() {
}
};
}
@Override
public FlowFileConcurrency getFlowFileConcurrency() {
return FlowFileConcurrency.UNBOUNDED;
}
@Override
public void setFlowFileConcurrency(final FlowFileConcurrency flowFileConcurrency) {
}
@Override
public FlowFileOutboundPolicy getFlowFileOutboundPolicy() {
return FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE;
}
@Override
public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy outboundPolicy) {
}
@Override
public boolean isDataQueued() {
return false;
}
@Override
public boolean isDataQueuedForProcessing() {
return false;
}
@Override
public void terminateProcessor(ProcessorNode processor) {
}

View File

@ -2477,6 +2477,8 @@ public final class DtoFactory {
dto.setName(group.getName());
dto.setVersionedComponentId(group.getVersionedComponentId().orElse(null));
dto.setVersionControlInformation(createVersionControlInformationDto(group));
dto.setFlowfileConcurrency(group.getFlowFileConcurrency().name());
dto.setFlowfileOutboundPolicy(group.getFlowFileOutboundPolicy().name());
final ParameterContext parameterContext = group.getParameterContext();
if (parameterContext != null) {
@ -4284,6 +4286,8 @@ public final class DtoFactory {
copy.setOutputPortCount(original.getOutputPortCount());
copy.setParentGroupId(original.getParentGroupId());
copy.setVersionedComponentId(original.getVersionedComponentId());
copy.setFlowfileConcurrency(original.getFlowfileConcurrency());
copy.setFlowfileOutboundPolicy(original.getFlowfileOutboundPolicy());
copy.setRunningCount(original.getRunningCount());
copy.setStoppedCount(original.getStoppedCount());

View File

@ -25,6 +25,8 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.ParameterContext;
@ -335,6 +337,11 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
final String name = processGroupDTO.getName();
final String comments = processGroupDTO.getComments();
final String concurrencyName = processGroupDTO.getFlowfileConcurrency();
final FlowFileConcurrency flowFileConcurrency = concurrencyName == null ? null : FlowFileConcurrency.valueOf(concurrencyName);
final String outboundPolicyName = processGroupDTO.getFlowfileOutboundPolicy();
final FlowFileOutboundPolicy flowFileOutboundPolicy = outboundPolicyName == null ? null : FlowFileOutboundPolicy.valueOf(outboundPolicyName);
final ParameterContextReferenceEntity parameterContextReference = processGroupDTO.getParameterContext();
if (parameterContextReference != null) {
@ -364,7 +371,12 @@ public class StandardProcessGroupDAO extends ComponentDAO implements ProcessGrou
if (isNotNull(comments)) {
group.setComments(comments);
}
if (flowFileConcurrency != null) {
group.setFlowFileConcurrency(flowFileConcurrency);
}
if (flowFileOutboundPolicy != null) {
group.setFlowFileOutboundPolicy(flowFileOutboundPolicy);
}
group.onComponentModified();
return group;
}

View File

@ -54,6 +54,24 @@
<span id="read-only-process-group-comments" class="unset"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">Process group FlowFile concurrency</div>
<div class="editable setting-field">
<div id="process-group-flowfile-concurrency-combo"></div>
</div>
<div class="read-only setting-field">
<span id="read-only-process-group-flowfile-concurrency" class="unset"></span>
</div>
</div>
<div class="setting">
<div class="setting-name">Process group outbound policy</div>
<div class="editable setting-field">
<div id="process-group-outbound-policy-combo"></div>
</div>
<div class="read-only setting-field">
<span id="read-only-process-group-outbound-policy" class="unset"></span>
</div>
</div>
<div class="editable settings-buttons">
<div id="process-group-configuration-save" class="button">Apply</div>
<div class="clear"></div>

View File

@ -85,6 +85,14 @@
width: 328px;
}
#process-group-flowfile-concurrency-combo {
width: 328px;
}
#process-group-outbound-policy-combo {
width: 328px;
}
#process-group-comments {
height: 100px;
}

View File

@ -105,7 +105,9 @@
'comments': $('#process-group-comments').val(),
'parameterContext': {
'id': $('#process-group-parameter-context-combo').combo('getSelectedOption').value
}
},
'flowfileConcurrency': $('#process-group-flowfile-concurrency-combo').combo('getSelectedOption').value,
'flowfileOutboundPolicy': $('#process-group-outbound-policy-combo').combo('getSelectedOption').value
}
};
@ -212,6 +214,41 @@
// populate the process group settings
$('#process-group-name').removeClass('unset').val(processGroup.name);
$('#process-group-comments').removeClass('unset').val(processGroup.comments);
$('#process-group-flowfile-concurrency-combo').removeClass('unset').combo({
options: [{
text: 'Single FlowFile Per Node',
value: 'SINGLE_FLOWFILE_PER_NODE',
description: 'Only a single FlowFile is to be allowed to enter the Process Group at a time on each node in the cluster. While that FlowFile may be split into many or '
+ 'spawn many children, no additional FlowFiles will be allowed to enter the Process Group through a Local Input Port until the previous FlowFile '
+ '- and all of its child/descendent FlowFiles - have been processed.'
}, {
text: 'Unbounded',
value: 'UNBOUNDED',
description: 'The number of FlowFiles that can be processed concurrently is unbounded.'
}],
selectedOption: {
value: processGroup.flowfileConcurrency
}
});
$('#process-group-outbound-policy-combo').removeClass('unset').combo({
options: [{
text: 'Stream When Available',
value: 'STREAM_WHEN_AVAILABLE',
description: 'FlowFiles that are queued up to be transferred out of a ProcessGroup by an Output Port will be transferred out '
+ 'of the Process Group as soon as they are available.'
}, {
text: 'Batch Output',
value: 'BATCH_OUTPUT',
description: 'FlowFiles that are queued up to be transferred out of a Process Group by an Output Port will remain queued until '
+ 'all FlowFiles in the Process Group are ready to be transferred out of the group. The FlowFiles will then be transferred '
+ 'out of the group. This setting will be ignored if the FlowFile Concurrency is Unbounded.'
}],
selectedOption: {
value: processGroup.flowfileOutboundPolicy
}
});
// populate the header
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
@ -228,6 +265,12 @@
$('#read-only-process-group-name').text(processGroup.name);
$('#read-only-process-group-comments').text(processGroup.comments);
var concurrencyName = processGroup.flowfileConcurrency == "UNBOUNDED" ? "Unbounded" : "Single FlowFile Per Node";
$('#read-only-process-group-flowfile-concurrency').text(concurrencyName);
var outboundPolicyName = processGroup.flowfileOutboundPolicy == "BATCH_OUTPUT" ? "Batch Output" : "Stream When Available";
$('#read-only-process-group-outbound-policy').text(outboundPolicyName);
// populate the header
$('#process-group-configuration-header-text').text(processGroup.name + ' Configuration');
} else {

View File

@ -17,6 +17,7 @@
package org.apache.nifi.tests.system;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
@ -380,12 +381,21 @@ public class NiFiClientUtil {
return counterValues;
}
public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
scheduleComponentsEntity.setId(groupId);
scheduleComponentsEntity.setState("RUNNING");
final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents("root", scheduleComponentsEntity);
return scheduleEntity;
}
public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
scheduleComponentsEntity.setId("root");
scheduleComponentsEntity.setId(groupId);
scheduleComponentsEntity.setState("STOPPED");
final ScheduleComponentsEntity scheduleEntity = nifiClient.getFlowClient().scheduleProcessGroupComponents("root", scheduleComponentsEntity);
waitForProcessorsStopped("root");
waitForProcessorsStopped(groupId);
return scheduleEntity;
}
@ -536,6 +546,18 @@ public class NiFiClientUtil {
}
}
public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()));
}
public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(AbstractPort.PORT_RELATIONSHIP.getName()));
}
public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final String relationship) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(relationship));
}
public ConnectionEntity createConnection(final ProcessorEntity source, final ProcessorEntity destination, final String relationship) throws NiFiClientException, IOException {
return createConnection(source, destination, Collections.singleton(relationship));
}
@ -548,27 +570,41 @@ public class NiFiClientUtil {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final ProcessorEntity source, final PortEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final PortEntity source, final PortEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final PortEntity source, final ProcessorEntity destination, final Set<String> relationships) throws NiFiClientException, IOException {
return createConnection(createConnectableDTO(source), createConnectableDTO(destination), relationships);
}
public ConnectionEntity createConnection(final ConnectableDTO source, final ConnectableDTO destination, final Set<String> relationships) throws NiFiClientException, IOException {
final String groupId = "OUTPUT_PORT".equals(source.getType()) ? destination.getGroupId() : source.getGroupId();
final ConnectionDTO connectionDto = new ConnectionDTO();
connectionDto.setSelectedRelationships(relationships);
connectionDto.setDestination(destination);
connectionDto.setSource(source);
connectionDto.setParentGroupId(source.getGroupId());
connectionDto.setParentGroupId(groupId);
final ConnectionEntity connectionEntity = new ConnectionEntity();
connectionEntity.setComponent(connectionDto);
connectionEntity.setDestinationGroupId(destination.getGroupId());
connectionEntity.setDestinationId(destination.getId());
connectionEntity.setDestinationType("PROCESSOR");
connectionEntity.setDestinationType(destination.getType());
connectionEntity.setSourceGroupId(source.getGroupId());
connectionEntity.setSourceId(source.getId());
connectionEntity.setDestinationType("PROCESSOR");
connectionEntity.setSourceType(source.getType());
connectionEntity.setRevision(createNewRevision());
return nifiClient.getConnectionClient().createConnection(source.getGroupId(), connectionEntity);
return nifiClient.getConnectionClient().createConnection(groupId, connectionEntity);
}
public ConnectableDTO createConnectableDTO(final ProcessorEntity processor) {
@ -778,6 +814,30 @@ public class NiFiClientUtil {
return childGroup;
}
public PortEntity createInputPort(final String name, final String groupId) throws NiFiClientException, IOException {
final PortDTO component = new PortDTO();
component.setName(name);
component.setParentGroupId(groupId);
final PortEntity inputPortEntity = new PortEntity();
inputPortEntity.setRevision(createNewRevision());
inputPortEntity.setComponent(component);
return nifiClient.getInputPortClient().createInputPort(groupId, inputPortEntity);
}
public PortEntity createOutputPort(final String name, final String groupId) throws NiFiClientException, IOException {
final PortDTO component = new PortDTO();
component.setName(name);
component.setParentGroupId(groupId);
final PortEntity outputPortEntity = new PortEntity();
outputPortEntity.setRevision(createNewRevision());
outputPortEntity.setComponent(component);
return nifiClient.getOutputPortClient().createOutputPort(groupId, outputPortEntity);
}
public ProvenanceEntity queryProvenance(final Map<SearchableField, String> searchTerms, final Long startTime, final Long endTime) throws NiFiClientException, IOException {
final Map<String, String> searchTermsAsStrings = searchTerms.entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().getSearchableFieldName(), Map.Entry::getValue));

View File

@ -30,18 +30,26 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public abstract class NiFiSystemIT {
private static final Logger logger = LoggerFactory.getLogger(NiFiSystemIT.class);
private final ConcurrentMap<String, Long> lastLogTimestamps = new ConcurrentHashMap<>();
public static final int CLIENT_API_PORT = 5671;
public static final String NIFI_GROUP_ID = "org.apache.nifi";
public static final String TEST_EXTENSIONS_ARTIFACT_ID = "nifi-system-test-extensions-nar";
@ -131,6 +139,8 @@ public abstract class NiFiSystemIT {
}
protected void waitForAllNodesConnected(final int expectedNumberOfNodes, final long sleepMillis) {
logger.info("Waiting for {} nodes to connect", expectedNumberOfNodes);
final NiFiClient client = getNifiClient();
final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(60);
@ -142,6 +152,8 @@ public abstract class NiFiSystemIT {
return;
}
logEverySecond("Waiting for {} nodes to connect but currently on {} nodes are connected", expectedNumberOfNodes, connectedNodeCount);
if (System.currentTimeMillis() > maxTime) {
throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected");
}
@ -262,10 +274,27 @@ public abstract class NiFiSystemIT {
}
protected void waitForQueueCount(final String connectionId, final int queueSize) throws InterruptedException {
logger.info("Waiting for Queue Count of {} on Connection {}", queueSize, connectionId);
waitFor(() -> {
final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
return statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == queueSize;
final int currentSize = statusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued();
final String sourceName = statusEntity.getConnectionStatus().getSourceName();
final String destinationName = statusEntity.getConnectionStatus().getDestinationName();
logEverySecond("Current Queue Size for Connection from {} to {} = {}, Waiting for {}", sourceName, destinationName, currentSize, queueSize);
return currentSize == queueSize;
});
logger.info("Queue Count for Connection {} is now {}", connectionId, queueSize);
}
private void logEverySecond(final String message, final Object... args) {
final Long lastLogTime = lastLogTimestamps.get(message);
if (lastLogTime == null || lastLogTime < System.currentTimeMillis() - 1000L) {
logger.info(message, args);
lastLogTimestamps.put(message, System.currentTimeMillis());
}
}
private ConnectionStatusEntity getConnectionStatus(final String connectionId) {

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.pg;
import org.apache.nifi.groups.FlowFileConcurrency;
import org.apache.nifi.groups.FlowFileOutboundPolicy;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.PortEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
@Test
public void testSingleConcurrency() throws NiFiClientException, IOException, InterruptedException {
final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root");
final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId());
final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId());
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("Batch Size", "3"));
getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupEntity.getId());
getClientUtil().updateProcessorProperties(sleep, Collections.singletonMap("onTrigger Sleep Time", "5 sec"));
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
// Connect Generate -> Input Port -> Sleep -> Output Port -> Terminate
// Since we will use Single FlowFile at a time concurrency, we should see that the connection between Input Port and Sleep
// never has more than 1 FlowFile even though the Sleep processor takes a long time.
final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success");
final ConnectionEntity inputToSleep = getClientUtil().createConnection(inputPort, sleep);
getClientUtil().createConnection(sleep, outputPort, "success");
final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate);
processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name());
getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity);
// Start all components except for Terminate. We want the data to queue up before terminate so we can ensure that the
// correct number of FlowFiles are queued up.
getClientUtil().startProcessGroupComponents("root");
getNifiClient().getProcessorClient().stopProcessor(terminate);
// Wait for 1 FlowFile to queue up for the Sleep Processor. This should leave 2 FlowFiles queued up for the input port.
waitForQueueCount(inputToSleep.getId(), 1);
assertEquals(2, getConnectionQueueSize(generateToInput.getId()));
// Wait until only 1 FlowFile is queued up for the input port. Because Sleep should take 5 seconds to complete its job,
// It should take 5 seconds for this to happen. But it won't be exact. So we'll ensure that it takes at least 3 seconds. We could
// put an upper bound such as 6 or 7 seconds as well, but it's a good idea to avoid that because the tests may run in some environments
// with constrained resources that may take a lot longer to run.
final long startTime = System.currentTimeMillis();
waitForQueueCount(generateToInput.getId(), 1);
final long endTime = System.currentTimeMillis();
final long delay = endTime - startTime;
assertTrue(delay > 3000L);
assertEquals(1, getConnectionQueueSize(inputToSleep.getId()));
waitForQueueCount(outputToTerminate.getId(), 2);
// Wait until all FlowFiles have been ingested.
waitForQueueCount(generateToInput.getId(), 0);
assertEquals(1, getConnectionQueueSize(inputToSleep.getId()));
// Ensure that 3 FlowFiles are queued up for Terminate
waitForQueueCount(outputToTerminate.getId(), 3);
}
@Test
public void testSingleConcurrencyAndBatchOutput() throws NiFiClientException, IOException, InterruptedException {
final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root");
final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId());
final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId());
final PortEntity secondOut = getClientUtil().createOutputPort("Out2", processGroupEntity.getId());
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
final ProcessorEntity sleep = getClientUtil().createProcessor("Sleep", processGroupEntity.getId()); // sleep with default configuration is just a simple pass-through
final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
// Connect Generate -> Input Port -> Count -> Output Port -> Terminate
// Also connect InputPort -> Out2 -> Terminate
final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success");
final ConnectionEntity inputToSleep = getClientUtil().createConnection(inputPort, sleep);
final ConnectionEntity sleepToOutput = getClientUtil().createConnection(sleep, outputPort, "success");
final ConnectionEntity inputToSecondOut = getClientUtil().createConnection(inputPort, secondOut);
final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate);
final ConnectionEntity secondOutToTerminate = getClientUtil().createConnection(secondOut, terminate);
processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name());
processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name());
getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity);
// Start generate so that data is created. Start Input Port so that the data is ingested.
// Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor,
// and that should prevent data from being transferred by Output Port "Out2" also.
getNifiClient().getProcessorClient().startProcessor(generate);
getNifiClient().getInputPortClient().startInputPort(inputPort);
getNifiClient().getOutputPortClient().startOutputPort(outputPort);
getNifiClient().getOutputPortClient().startOutputPort(secondOut);
waitForQueueCount(inputToSleep.getId(), 1);
assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
// Wait 3 seconds to ensure that data is never transferred
for (int i=0; i < 3; i++) {
Thread.sleep(1000L);
assertEquals(1, getConnectionQueueSize(inputToSleep.getId()));
assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId()));
}
// Start Sleep
getNifiClient().getProcessorClient().startProcessor(sleep);
// Data should now flow from both output ports.
waitForQueueCount(inputToSleep.getId(), 0);
waitForQueueCount(inputToSecondOut.getId(), 0);
assertEquals(1, getConnectionQueueSize(outputToTerminate.getId()));
assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId()));
}
}

View File

@ -30,7 +30,13 @@ public interface OutputPortClient {
PortEntity deleteOutputPort(PortEntity entity) throws NiFiClientException, IOException;
/**
* @deprecated use startOutputPort
*/
@Deprecated
PortEntity startInpuOutputPort(PortEntity entity) throws NiFiClientException, IOException;
PortEntity startOutputPort(PortEntity entity) throws NiFiClientException, IOException;
PortEntity stopOutputPort(PortEntity entity) throws NiFiClientException, IOException;
}

View File

@ -62,6 +62,11 @@ public class JerseyOutputPortClient extends CRUDJerseyClient<PortEntity> impleme
@Override
public PortEntity startInpuOutputPort(final PortEntity entity) throws NiFiClientException, IOException {
return startOutputPort(entity);
}
@Override
public PortEntity startOutputPort(final PortEntity entity) throws NiFiClientException, IOException {
final PortEntity startEntity = createStateEntity(entity, "RUNNING");
return updateOutputPort(startEntity);
}