diff --git a/nifi-docs/src/main/asciidoc/user-guide.adoc b/nifi-docs/src/main/asciidoc/user-guide.adoc index 369e87491a..958d4d29be 100644 --- a/nifi-docs/src/main/asciidoc/user-guide.adoc +++ b/nifi-docs/src/main/asciidoc/user-guide.adoc @@ -769,7 +769,11 @@ for Output Port B. These conditions are both considered the same in terms of the Using an Outbound Policy of "Batch Output" along with a FlowFile Concurrency of "Single FlowFile Per Node" allows a user to easily ingest a single FlowFile (which in and of itself may represent a batch of data) and then wait until all processing of that FlowFile has completed before continuing on to the next step -in the dataflow (i.e., the next component outside of the Process Group). +in the dataflow (i.e., the next component outside of the Process Group). Additionally, when using this mode, each FlowFile that is transferred out of the Process Group +will be given a series of attributes named "batch.output." for each Output Port in the Process Group. The value will be equal to the number of FlowFiles +that were routed to that Output Port for this batch of data. For example, consider a case where a single FlowFile is split into 5 FlowFiles, and two FlowFiles go to Output Port A, one goes +to Output Port B, and two go to Output Port C, and no FlowFiles go to Output Port D. In this case, each FlowFile will attributes batch.output.A = 2, +batch.output.B = 1, batch.output.C = 2, batch.output.D = 0. The Outbound Policy of "Batch Output" doesn't provide any benefits when used in conjunction with a FlowFile Concurrency of "Unbounded." As a result, the Outbound Policy is ignored if the FlowFile Concurrency is set to "Unbounded." diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java new file mode 100644 index 0000000000..dcdcad6d42 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/BatchCounts.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Map; + +public interface BatchCounts { + /** + * Resets the counts + */ + void reset(); + + /** + * Captures a mapping of Output Port name to the number of FlowFiles that were transferred to that Output Port + * @return a mapping of Output Port name to the number of FlowFiles that were transferred to that Output Port + */ + Map captureCounts(); +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index 3e111cfcf7..1e8b8d72b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -1104,4 +1104,11 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi * @return true if there is data that is queued for Processing, false otherwise */ boolean isDataQueuedForProcessing(); + + /** + * @return the BatchCounts that can be used for determining how many FlowFiles were transferred to each of the Output Ports + * in this Process Group, or null if this Process Group does not have an {@link #getFlowFileOutboundPolicy()} + * of {@link FlowFileOutboundPolicy#BATCH_OUTPUT}. + */ + BatchCounts getBatchCounts(); } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java index c636f921c3..37d619c0b5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -20,6 +20,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractPort; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.groups.BatchCounts; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileGate; import org.apache.nifi.groups.FlowFileOutboundPolicy; @@ -33,7 +34,9 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -74,28 +77,20 @@ public class LocalPort extends AbstractPort { return maxIterations; } - private boolean[] validateConnections() { - // LocalPort requires both in/out. - final boolean requireInput = true; - final boolean requireOutput = true; - - return new boolean[]{requireInput, hasIncomingConnection(), - requireOutput, !getConnections(Relationship.ANONYMOUS).isEmpty()}; - } - @Override public boolean isValid() { - final boolean[] connectionRequirements = validateConnections(); - return (!connectionRequirements[0] || connectionRequirements[1]) - && (!connectionRequirements[2] || connectionRequirements[3]); + return hasIncomingConnection() && hasOutboundConnection(); + } + + private boolean hasOutboundConnection() { + return !getConnections(Relationship.ANONYMOUS).isEmpty(); } @Override public Collection getValidationErrors() { - final boolean[] connectionRequirements = validateConnections(); final Collection validationErrors = new ArrayList<>(); // Incoming connections are required but not set - if (connectionRequirements[0] && !connectionRequirements[1]) { + if (!hasIncomingConnection()) { validationErrors.add(new ValidationResult.Builder() .explanation("Port has no incoming connections") .subject(String.format("Port '%s'", getName())) @@ -104,7 +99,7 @@ public class LocalPort extends AbstractPort { } // Outgoing connections are required but not set - if (connectionRequirements[2] && !connectionRequirements[3]) { + if (!hasOutboundConnection()) { validationErrors.add(new ValidationResult.Builder() .explanation("Port has no outgoing connections") .subject(String.format("Port '%s'", getName())) @@ -195,9 +190,15 @@ public class LocalPort extends AbstractPort { } session.transfer(flowFile, Relationship.ANONYMOUS); + getProcessGroup().getBatchCounts().reset(); } + protected void transferUnboundedConcurrency(final ProcessContext context, final ProcessSession session) { + final Map attributes = new HashMap<>(); + final Map counts = getProcessGroup().getBatchCounts().captureCounts(); + counts.forEach((k, v) -> attributes.put("batch.output." + k, String.valueOf(v))); + Set available = context.getAvailableRelationships(); int iterations = 0; while (!available.isEmpty()) { @@ -206,6 +207,10 @@ public class LocalPort extends AbstractPort { break; } + if (!attributes.isEmpty()) { + flowFiles.forEach(ff -> session.putAllAttributes(ff, attributes)); + } + session.transfer(flowFiles, Relationship.ANONYMOUS); session.commit(); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java new file mode 100644 index 0000000000..475745e8b8 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/NoOpBatchCounts.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import java.util.Collections; +import java.util.Map; + +public class NoOpBatchCounts implements BatchCounts { + @Override + public void reset() { + } + + @Override + public Map captureCounts() { + return Collections.emptyMap(); + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java new file mode 100644 index 0000000000..68df51e0e0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardBatchCounts.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.groups; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.connectable.Port; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class StandardBatchCounts implements BatchCounts { + private static final Logger logger = LoggerFactory.getLogger(StandardBatchCounts.class); + + private final ProcessGroup processGroup; + private Map counts = null; + private boolean hasBeenReset = false; + private final StateManager stateManager; + + private Map restoredValues; + + public StandardBatchCounts(final ProcessGroup processGroup, final StateManager stateManager) { + this.processGroup = processGroup; + this.stateManager = stateManager; + } + + @Override + public synchronized void reset() { + counts = null; + hasBeenReset = true; + + try { + stateManager.clear(Scope.LOCAL); + } catch (final Exception e) { + logger.error("Failed to update local state for {}. This could result in the batch.output.* attributes being inaccurate if NiFi is restarted before this is resolved", processGroup, e); + } + } + + @Override + public synchronized Map captureCounts() { + // If reset() hasn't been called, we don't want to provide any counts. This can happen if the Process Group's + // FlowFileConcurrency and/or FlowFileOutboundPolicy is changed while data is already being processed in the Process Group. + // In this case, we don't want to provide counts until we've cleared all those FlowFiles out, and when that happens, reset() will be called. + if (!hasBeenReset) { + return restoreState(); + } + + if (counts == null) { + counts = new HashMap<>(); + final Map stateMapValues = new HashMap<>(); + + for (final Port outputPort : processGroup.getOutputPorts()) { + int count = 0; + + for (final Connection connection : outputPort.getIncomingConnections()) { + count += connection.getFlowFileQueue().size().getObjectCount(); + } + + final String name = outputPort.getName(); + counts.put(name, count); + stateMapValues.put(name, String.valueOf(count)); + } + + try { + stateManager.setState(stateMapValues, Scope.LOCAL); + } catch (final Exception e) { + logger.error("Failed to update local state for {}. This could result in the batch.output.* attributes being inaccurate if NiFi is restarted before this is resolved", processGroup, e); + } + } + + return counts; + } + + private Map restoreState() { + if (restoredValues != null) { + return restoredValues; + } + + final StateMap stateMap; + try { + stateMap = stateManager.getState(Scope.LOCAL); + } catch (IOException e) { + logger.error("Failed to restore local state for {}. This could result in the batch.output.* attributes being inaccurate for the current batch of FlowFiles", processGroup, e); + return Collections.emptyMap(); + } + + restoredValues = new HashMap<>(); + + final Map rawValues = stateMap.toMap(); + rawValues.forEach((k, v) -> restoredValues.put(k, Integer.parseInt(v))); + return restoredValues; + } +} diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index da535275a8..351b9b1e2d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -199,6 +199,7 @@ public final class StandardProcessGroup implements ProcessGroup { private FlowFileConcurrency flowFileConcurrency = FlowFileConcurrency.UNBOUNDED; private volatile FlowFileGate flowFileGate = new UnboundedFlowFileGate(); private volatile FlowFileOutboundPolicy flowFileOutboundPolicy = FlowFileOutboundPolicy.STREAM_WHEN_AVAILABLE; + private volatile BatchCounts batchCounts = new NoOpBatchCounts(); private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); private final Lock readLock = rwLock.readLock(); @@ -707,6 +708,11 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @Override + public BatchCounts getBatchCounts() { + return batchCounts; + } + @Override public void addProcessGroup(final ProcessGroup group) { if (StringUtils.isEmpty(group.getName())) { @@ -5317,6 +5323,8 @@ public final class StandardProcessGroup implements ProcessGroup { flowFileGate = new SingleConcurrencyFlowFileGate(() -> !isDataQueued()); break; } + + setBatchCounts(getFlowFileOutboundPolicy(), flowFileConcurrency); } finally { writeLock.unlock(); } @@ -5372,5 +5380,21 @@ public final class StandardProcessGroup implements ProcessGroup { @Override public void setFlowFileOutboundPolicy(final FlowFileOutboundPolicy flowFileOutboundPolicy) { this.flowFileOutboundPolicy = flowFileOutboundPolicy; + setBatchCounts(flowFileOutboundPolicy, getFlowFileConcurrency()); + } + + private synchronized void setBatchCounts(final FlowFileOutboundPolicy outboundPolicy, final FlowFileConcurrency flowFileConcurrency) { + if (outboundPolicy == FlowFileOutboundPolicy.BATCH_OUTPUT && flowFileConcurrency == FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE) { + if (batchCounts instanceof NoOpBatchCounts) { + final StateManager stateManager = flowController.getStateManagerProvider().getStateManager(getIdentifier()); + batchCounts = new StandardBatchCounts(this, stateManager); + } + } else { + if (batchCounts != null) { + batchCounts.reset(); + } + + batchCounts = new NoOpBatchCounts(); + } } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java index c5f05bc267..d1d94e9426 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java @@ -32,9 +32,11 @@ import org.apache.nifi.controller.Snippet; import org.apache.nifi.controller.Template; import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceNode; +import org.apache.nifi.groups.BatchCounts; import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileGate; import org.apache.nifi.groups.FlowFileOutboundPolicy; +import org.apache.nifi.groups.NoOpBatchCounts; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; @@ -764,6 +766,11 @@ public class MockProcessGroup implements ProcessGroup { return false; } + @Override + public BatchCounts getBatchCounts() { + return new NoOpBatchCounts(); + } + @Override public void terminateProcessor(ProcessorNode processor) { } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java index 02e8bbc787..ff01a40cc0 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -21,7 +21,10 @@ import org.apache.nifi.groups.FlowFileConcurrency; import org.apache.nifi.groups.FlowFileOutboundPolicy; import org.apache.nifi.tests.system.NiFiSystemIT; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.dto.FlowFileSummaryDTO; import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.FlowFileEntity; +import org.apache.nifi.web.api.entity.ListingRequestEntity; import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; @@ -29,8 +32,11 @@ import org.junit.Test; import java.io.IOException; import java.util.Collections; +import java.util.List; +import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { @@ -148,5 +154,87 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { assertEquals(1, getConnectionQueueSize(outputToTerminate.getId())); assertEquals(1, getConnectionQueueSize(secondOutToTerminate.getId())); + + final FlowFileEntity outputToTerminateFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(outputToTerminateFlowFile); + final Map attributes = outputToTerminateFlowFile.getFlowFile().getAttributes(); + assertEquals("1", attributes.get("batch.output.Out")); + assertEquals("1", attributes.get("batch.output.Out2")); + + final FlowFileEntity secondOutFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(secondOutFlowFile); + final Map secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); + assertEquals("1", secondOutAttributes.get("batch.output.Out")); + assertEquals("1", secondOutAttributes.get("batch.output.Out2")); } + + + @Test + public void testBatchOutputHasCorrectNumbersOnRestart() throws NiFiClientException, IOException, InterruptedException { + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); + final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); + final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); + PortEntity secondOut = getClientUtil().createOutputPort("Out2", processGroupEntity.getId()); + + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + + // Connect Generate -> Input Port -> Count -> Output Port -> Terminate + // Also connect InputPort -> Out2 -> Terminate + final ConnectionEntity generateToInput = getClientUtil().createConnection(generate, inputPort, "success"); + final ConnectionEntity inputToOutput = getClientUtil().createConnection(inputPort, outputPort); + final ConnectionEntity inputToSecondOut = getClientUtil().createConnection(inputPort, secondOut); + final ConnectionEntity outputToTerminate = getClientUtil().createConnection(outputPort, terminate); + final ConnectionEntity secondOutToTerminate = getClientUtil().createConnection(secondOut, terminate); + + processGroupEntity.getComponent().setFlowfileConcurrency(FlowFileConcurrency.SINGLE_FLOWFILE_PER_NODE.name()); + processGroupEntity.getComponent().setFlowfileOutboundPolicy(FlowFileOutboundPolicy.BATCH_OUTPUT.name()); + getNifiClient().getProcessGroupClient().updateProcessGroup(processGroupEntity); + + // Start generate so that data is created. Start Input Port so that the data is ingested. + // Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port. + getNifiClient().getProcessorClient().startProcessor(generate); + getNifiClient().getInputPortClient().startInputPort(inputPort); + getNifiClient().getOutputPortClient().startOutputPort(outputPort); + + waitForQueueCount(inputToSecondOut.getId(), 1); + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + + // Everything is queued up at an Output Port so the first Output Port should run and its queue should become empty. + waitForQueueCount(inputToOutput.getId(), 0); + + // Restart nifi. + getNiFiInstance().stop(); + getNiFiInstance().start(true); + + // After nifi, we should see that one FlowFile is still queued up for Out2. + assertEquals(1, getConnectionQueueSize(inputToSecondOut.getId())); + + // Get a new copy of the Out2 port because NiFi has restarted so that Revision will be different. + secondOut = getNifiClient().getOutputPortClient().getOutputPort(secondOut.getId()); + getNifiClient().getOutputPortClient().startOutputPort(secondOut); + + // Data should now flow from both output ports. + waitForQueueCount(inputToOutput.getId(), 0); + waitForQueueCount(inputToSecondOut.getId(), 0); + waitForQueueCount(outputToTerminate.getId(), 1); + waitForQueueCount(secondOutToTerminate.getId(), 1); + + // Each FlowFile should now have attributes batch.output.Out = 1, batch.output.Out2 = 1 + // Even though upon restart, the "Out" Port had no FlowFiles because there still was 1 FlowFile that went to Out as part of the same batch. + final FlowFileEntity outputToTerminateFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(outputToTerminateFlowFile); + final Map attributes = outputToTerminateFlowFile.getFlowFile().getAttributes(); + assertEquals("1", attributes.get("batch.output.Out")); + assertEquals("1", attributes.get("batch.output.Out2")); + + final FlowFileEntity secondOutFlowFile = getClientUtil().getQueueFlowFile(outputToTerminate.getId(), 0); + assertNotNull(secondOutFlowFile); + final Map secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); + assertEquals("1", secondOutAttributes.get("batch.output.Out")); + assertEquals("1", secondOutAttributes.get("batch.output.Out2")); + } + }