From 44fc4d9f27cad664186e1b7a397a785dd90644ec Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 16 Jun 2020 16:50:10 -0400 Subject: [PATCH] NIFI-7552: When Process Group is configured to transfer data in batch, add an attribute to each outbound FlowFile that indicates how many FlowFiles went to each port. Updated user guide to explain the new attributes. Signed-off-by: Pierre Villard This closes #4345. --- nifi-docs/src/main/asciidoc/user-guide.adoc | 6 +- .../org/apache/nifi/groups/BatchCounts.java | 33 +++++ .../org/apache/nifi/groups/ProcessGroup.java | 7 ++ .../apache/nifi/connectable/LocalPort.java | 35 +++--- .../apache/nifi/groups/NoOpBatchCounts.java | 32 +++++ .../nifi/groups/StandardBatchCounts.java | 114 ++++++++++++++++++ .../nifi/groups/StandardProcessGroup.java | 24 ++++ .../service/mock/MockProcessGroup.java | 7 ++ .../pg/SingleFlowFileConcurrencyIT.java | 88 ++++++++++++++ 9 files changed, 330 insertions(+), 16 deletions(-) create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java create mode 100644 nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 369e87491a..958d4d29be 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -769,7 +769,11 @@ for Output Port B. These conditions are both considered the same in terms of the Using an Outbound Policy of "Batch Output" along with a FlowFile Concurrency of "Single FlowFile Per Node" allows a user to easily ingest a single FlowFile (which in and of itself may represent a batch of data) and then wait until all processing of that FlowFile has completed before continuing on to the next step -in the dataflow (i.e., the next component outside of the Process Group). +in the dataflow (i.e., the next component outside of the Process Group). Additionally, when using this mode, each FlowFile that is transferred out of the Process Group +will be given a series of attributes named "batch.output." for each Output Port in the Process Group. The value will be equal to the number of FlowFiles +that were routed to that Output Port for this batch of data. For example, consider a case where a single FlowFile is split into 5 FlowFiles, and two FlowFiles go to Output Port A, one goes +to Output Port B, and two go to Output Port C, and no FlowFiles go to Output Port D. In this case, each FlowFile will attributes batch.output.A = 2, +batch.output.B = 1, batch.output.C = 2, batch.output.D = 0. The Outbound Policy of "Batch Output" doesn't provide any benefits when used in conjunction with a FlowFile Concurrency of "Unbounded." As a result, the Outbound Policy is ignored if the FlowFile Concurrency is set to "Unbounded." diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java new file mode 100644 index 0000000000..dcdcad6d42 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Map; + +public interface BatchCounts { + /** + * Resets the counts + */ + void reset(); + + /** + * Captures a mapping of Output Port name to the number of FlowFiles that were transferred to that Output Port + * @return a mapping of Output Port name to the number of FlowFiles that were transferred to that Output Port + */ + Map captureCounts(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 3e111cfcf7..1e8b8d72b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -1104,4 +1104,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @return true if there is data that is queued for Processing, false otherwise */ boolean isDataQueuedForProcessing(); + + /** + * @return the BatchCounts that can be used for determining how many FlowFiles were transferred to each of the Output Ports + * in this Process Group, or null if this Process Group does not have an {@link #getFlowFileOutboundPolicy()} + * of {@link FlowFileOutboundPolicy#BATCH_OUTPUT}. + */ + BatchCounts getBatchCounts(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index c636f921c3..37d619c0b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -20,6 +20,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.BatchCounts; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileGate; import org.apache.nifi.groups.FlowFileOutboundPolicy; @@ -33,7 +34,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -74,28 +77,20 @@ public class LocalPort extends AbstractPort { return maxIterations; } - private boolean[] validateConnections() { - // LocalPort requires both in/out. - final boolean requireInput = true; - final boolean requireOutput = true; - - return new boolean[]{requireInput, hasIncomingConnection(), - requireOutput, !getConnections(Relationship.ANONYMOUS).isEmpty()}; - } - @Override public boolean isValid() { - final boolean[] connectionRequirements = validateConnections(); - return (!connectionRequirements[0] || connectionRequirements[1]) - && (!connectionRequirements[2] || connectionRequirements[3]); + return hasIncomingConnection() && hasOutboundConnection(); + } + + private boolean hasOutboundConnection() { + return !getConnections(Relationship.ANONYMOUS).isEmpty(); } @Override public Collection getValidationErrors() { - final boolean[] connectionRequirements = validateConnections(); final Collection validationErrors = new ArrayList<>(); // Incoming connections are required but not set - if (connectionRequirements[0] && !connectionRequirements[1]) { + if (!hasIncomingConnection()) { validationErrors.add(new ValidationResult.Builder() .explanation("Port has no incoming connections") .subject(String.format("Port '%s'", getName())) @@ -104,7 +99,7 @@ public class LocalPort extends AbstractPort { } // Outgoing connections are required but not set - if (connectionRequirements[2] && !connectionRequirements[3]) { + if (!hasOutboundConnection()) { validationErrors.add(new ValidationResult.Builder() .explanation("Port has no outgoing connections") .subject(String.format("Port '%s'", getName())) @@ -195,9 +190,15 @@ public class LocalPort extends AbstractPort { } session.transfer(flowFile, Relationship.ANONYMOUS); + getProcessGroup().getBatchCounts().reset(); } + protected void transferUnboundedConcurrency(final ProcessContext context, final ProcessSession session) { + final Map attributes = new HashMap<>(); + final Map counts = getProcessGroup().getBatchCounts().captureCounts(); + counts.forEach((k, v) -> attributes.put("batch.output." + k, String.valueOf(v))); + Set available = context.getAvailableRelationships(); int iterations = 0; while (!available.isEmpty()) { @@ -206,6 +207,10 @@ public class LocalPort extends AbstractPort { break; } + if (!attributes.isEmpty()) { + flowFiles.forEach(ff -> session.putAllAttributes(ff, attributes)); + } + session.transfer(flowFiles, Relationship.ANONYMOUS); session.commit(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java new file mode 100644 index 0000000000..475745e8b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Collections; +import java.util.Map; + +public class NoOpBatchCounts implements BatchCounts { + @Override + public void reset() { + } + + @Override + public Map captureCounts() { + return Collections.emptyMap(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java new file mode 100644 index 0000000000..68df51e0e0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class StandardBatchCounts implements BatchCounts { + private static final Logger logger = LoggerFactory.getLogger(StandardBatchCounts.class); + + private final ProcessGroup processGroup; + private Map counts = null; + private boolean hasBeenReset = false; + private final StateManager stateManager; + + private Map restoredValues; + + public StandardBatchCounts(final ProcessGroup processGroup, final StateManager stateManager) { + this.processGroup = processGroup; + this.stateManager = stateManager; + } + + @Override + public synchronized void reset() { + counts = null; + hasBeenReset = true; + + try { + stateManager.clear(Scope.LOCAL); + } catch (final Exception e) { + logger.error("Failed to update local state for {}. This could result in the batch.output.* attributes being inaccurate if NiFi is restarted before this is resolved", processGroup, e); + } + } + + @Override + public synchronized Map captureCounts() { + // If reset() hasn't been called, we don't want to provide any counts. This can happen if the Process Group's + // FlowFileConcurrency and/or FlowFileOutboundPolicy is changed while data is already being processed in the Process Group. + // In this case, we don't want to provide counts until we've cleared all those FlowFiles out, and when that happens, reset() will be called. + if (!hasBeenReset) { + return restoreState(); + } + + if (counts == null) { + counts = new HashMap<>(); + final Map stateMapValues = new HashMap<>(); + + for (final Port outputPort : processGroup.getOutputPorts()) { + int count = 0; + + for (final Connection connection : outputPort.getIncomingConnections()) { + count += connection.getFlowFileQueue().size().getObjectCount(); + } + + final String name = outputPort.getName(); + counts.put(name, count); + stateMapValues.put(name, String.valueOf(count)); + } + + try { + stateManager.setState(stateMapValues, Scope.LOCAL); + } catch (final Exception e) { + logger.error("Failed to update local state for {}. This could result in the batch.output.* attributes being inaccurate if NiFi is restarted before this is resolved", processGroup, e); + } + } + + return counts; + } + + private Map restoreState() { + if (restoredValues != null) { + return restoredValues; + } + + final StateMap stateMap; + try { + stateMap = stateManager.getState(Scope.LOCAL); + } catch (IOException e) { + logger.error("Failed to restore local state for {}. This could result in the batch.output.* attributes being inaccurate for the current batch of FlowFiles", processGroup, e); + return Collections.emptyMap(); + } + + restoredValues = new HashMap<>(); + + final Map rawValues = stateMap.toMap(); + rawValues.forEach((k, v) -> restoredValues.put(k, Integer.parseInt(v))); + return restoredValues; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index da535275a8..351b9b1e2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -199,6 +199,7 @@ public final class StandardProcessGroup implements ProcessGroup { private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED; private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate(); private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE; + private volatile BatchCounts batchCounts = new NoOpBatchCounts(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -707,6 +708,11 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @Override + public BatchCounts getBatchCounts() { + return batchCounts; + } + @Override public void addProcessGroup(final ProcessGroup group) { if (StringUtils.isEmpty(group.getName())) { @@ -5317,6 +5323,8 @@ public final class StandardProcessGroup implements ProcessGroup { flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued()); break; } + + setBatchCounts(getFlowFileOutboundPolicy(), flowFileConcurrency); } finally { writeLock.unlock(); } @@ -5372,5 +5380,21 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) { this.flowFileOutboundPolicy = flowFileOutboundPolicy; + setBatchCounts(flowFileOutboundPolicy, getFlowFileConcurrency()); + } + + private synchronized void setBatchCounts(final FlowFileOutboundPolicy outboundPolicy, final FlowFileConcurrency flowFileConcurrency) { + if (outboundPolicy == FlowFileOutboundPolicy.BATCH_OUTPUT && flowFileConcurrency == FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE) { + if (batchCounts instanceof NoOpBatchCounts) { + final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(getIdentifier()); + batchCounts = new StandardBatchCounts(this, stateManager); + } + } else { + if (batchCounts != null) { + batchCounts.reset(); + } + + batchCounts = new NoOpBatchCounts(); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index c5f05bc267..d1d94e9426 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -32,9 +32,11 @@ import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.groups.BatchCounts; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileGate; import org.apache.nifi.groups.FlowFileOutboundPolicy; +import org.apache.nifi.groups.NoOpBatchCounts; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; @@ -764,6 +766,11 @@ public class MockProcessGroup implements ProcessGroup { return false; } + @Override + public BatchCounts getBatchCounts() { + return new NoOpBatchCounts(); + } + @Override public void terminateProcessor(ProcessorNode processor) { } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java index 02e8bbc787..ff01a40cc0 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -21,7 +21,10 @@ import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.tests.system.NiFiSystemIT; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; @@ -29,8 +32,11 @@ import org.junit.Test; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { @@ -148,5 +154,87 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { assertEquals(1, getConnectionQueueSize(outputToTerminate.getId())); assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId())); + + final FlowFileEntity outputToTerminateFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(outputToTerminateFlowFile); + final Map attributes = outputToTerminateFlowFile.getFlowFile().getAttributes(); + assertEquals("1", attributes.get("batch.output.Out")); + assertEquals("1", attributes.get("batch.output.Out2")); + + final FlowFileEntity secondOutFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(secondOutFlowFile); + final Map secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); + assertEquals("1", secondOutAttributes.get("batch.output.Out")); + assertEquals("1", secondOutAttributes.get("batch.output.Out2")); } + + + @Test + public void testBatchOutputHasCorrectNumbersOnRestart() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); + final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); + final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); + PortEntity secondOut = getClientUtil().createOutputPort("Out2", processGroupEntity.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Connect Generate -> Input Port -> Count -> Output Port -> Terminate + // Also connect InputPort -> Out2 -> Terminate + final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success"); + final ConnectionEntity inputToOutput = getClientUtil().createConnection(inputPort, outputPort); + final ConnectionEntity inputToSecondOut = getClientUtil().createConnection(inputPort, secondOut); + final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate); + final ConnectionEntity secondOutToTerminate = getClientUtil().createConnection(secondOut, terminate); + + processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name()); + processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name()); + getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity); + + // Start generate so that data is created. Start Input Port so that the data is ingested. + // Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port. + getNifiClient().getProcessorClient().startProcessor(generate); + getNifiClient().getInputPortClient().startInputPort(inputPort); + getNifiClient().getOutputPortClient().startOutputPort(outputPort); + + waitForQueueCount(inputToSecondOut.getId(), 1); + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + + // Everything is queued up at an Output Port so the first Output Port should run and its queue should become empty. + waitForQueueCount(inputToOutput.getId(), 0); + + // Restart nifi. + getNiFiInstance().stop(); + getNiFiInstance().start(true); + + // After nifi, we should see that one FlowFile is still queued up for Out2. + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + + // Get a new copy of the Out2 port because NiFi has restarted so that Revision will be different. + secondOut = getNifiClient().getOutputPortClient().getOutputPort(secondOut.getId()); + getNifiClient().getOutputPortClient().startOutputPort(secondOut); + + // Data should now flow from both output ports. + waitForQueueCount(inputToOutput.getId(), 0); + waitForQueueCount(inputToSecondOut.getId(), 0); + waitForQueueCount(outputToTerminate.getId(), 1); + waitForQueueCount(secondOutToTerminate.getId(), 1); + + // Each FlowFile should now have attributes batch.output.Out = 1, batch.output.Out2 = 1 + // Even though upon restart, the "Out" Port had no FlowFiles because there still was 1 FlowFile that went to Out as part of the same batch. + final FlowFileEntity outputToTerminateFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(outputToTerminateFlowFile); + final Map attributes = outputToTerminateFlowFile.getFlowFile().getAttributes(); + assertEquals("1", attributes.get("batch.output.Out")); + assertEquals("1", attributes.get("batch.output.Out2")); + + final FlowFileEntity secondOutFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(secondOutFlowFile); + final Map secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); + assertEquals("1", secondOutAttributes.get("batch.output.Out")); + assertEquals("1", secondOutAttributes.get("batch.output.Out2")); + } + }