From cf4e966d912975dfeac1cbf80f9e073536405897 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 17 Mar 2021 13:30:38 -0400 Subject: [PATCH] NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the session didn't account for FlowFile's contentClaimOffset when seeking to the appropriate location in the stream. Signed-off-by: Joe Witt --- .../repository/StandardProcessSession.java | 4 +- .../tests/system/ReverseContents.java | 2 + .../processors/tests/system/SplitByLine.java | 126 +++++++++++++++ .../tests/system/VerifyContents.java | 102 ++++++++++++ .../org.apache.nifi.processor.Processor | 2 + .../nifi/tests/system/NiFiClientUtil.java | 23 +++ .../tests/system/processor/RunOnceIT.java | 3 - .../system/repositories/ContentAccessIT.java | 150 ++++++++++++++++++ 8 files changed, 407 insertions(+), 5 deletions(-) create mode 100644 nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java create mode 100644 nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 1d24f33b36..b2b1bde9b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2300,9 +2300,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn currentReadClaim = claim.getResourceClaim(); final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim()); - StreamUtils.skip(contentRepoStream, claim.getOffset()); + StreamUtils.skip(contentRepoStream, claim.getOffset() + contentClaimOffset); final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream); - final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset()); + final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset() + contentClaimOffset); currentReadClaimStream = byteCountingInputStream; // Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java index cfb112404a..62c8dc2ccd 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/ReverseContents.java @@ -17,6 +17,7 @@ package org.apache.nifi.processors.tests.system; +import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -31,6 +32,7 @@ import java.io.OutputStream; import java.util.Collections; import java.util.Set; +@SupportsBatching public class ReverseContents extends AbstractProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java new file mode 100644 index 0000000000..93b8e141e4 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/SplitByLine.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.util.TextLineDemarcator; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public class SplitByLine extends AbstractProcessor { + + static final PropertyDescriptor USE_CLONE = new PropertyDescriptor.Builder() + .name("Use Clone") + .description("Whether or not to use session.clone for generating children FlowFiles") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .build(); + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return Collections.singletonList(USE_CLONE); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final boolean clone = context.getProperty(USE_CLONE).asBoolean(); + if (clone) { + splitByClone(session, flowFile); + } else { + splitByWrite(session, flowFile); + } + + session.remove(flowFile); + } + + private void splitByClone(ProcessSession session, FlowFile flowFile) { + final List offsetInfos = new ArrayList<>(); + + try (final InputStream in = session.read(flowFile); + final TextLineDemarcator demarcator = new TextLineDemarcator(in)) { + + TextLineDemarcator.OffsetInfo offsetInfo; + while ((offsetInfo = demarcator.nextOffsetInfo()) != null) { + offsetInfos.add(offsetInfo); + } + } catch (final Exception e) { + throw new ProcessException(e); + } + + for (final TextLineDemarcator.OffsetInfo offsetInfo : offsetInfos) { + FlowFile child = session.clone(flowFile, offsetInfo.getStartOffset(), offsetInfo.getLength() - offsetInfo.getCrlfLength()); + session.putAttribute(child, "num.lines", String.valueOf(offsetInfos.size())); + session.transfer(child, REL_SUCCESS); + } + } + + private void splitByWrite(ProcessSession session, FlowFile flowFile) { + final List children = new ArrayList<>(); + try (final InputStream in = session.read(flowFile); + final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + + String line; + while ((line = reader.readLine()) != null) { + FlowFile child = session.create(flowFile); + children.add(child); + + try (final OutputStream out = session.write(child)) { + final byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8); + out.write(lineBytes); + } + } + } catch (final Exception e) { + throw new ProcessException(e); + } + + for (FlowFile child : children) { + session.putAttribute(child, "num.lines", String.valueOf(children.size())); + } + + session.transfer(children, REL_SUCCESS); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java new file mode 100644 index 0000000000..9454e968cd --- /dev/null +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/VerifyContents.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.tests.system; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +public class VerifyContents extends AbstractProcessor { + private static final Relationship REL_UNMATCHED = new Relationship.Builder() + .name("unmatched") + .build(); + + private final AtomicReference> relationshipsRef = new AtomicReference<>(Collections.singleton(REL_UNMATCHED)); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .dynamic(true) + .addValidator(Validator.VALID) + .build(); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + final Relationship relationship = new Relationship.Builder() + .name(descriptor.getName()) + .build(); + + final Set updatedRelationships = new HashSet<>(relationshipsRef.get()); + + if (newValue == null) { + updatedRelationships.remove(relationship); + } else { + updatedRelationships.add(relationship); + } + + updatedRelationships.add(REL_UNMATCHED); // Ensure that the unmatched relationship is always available + relationshipsRef.set(updatedRelationships); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final String contents; + try (final InputStream in = session.read(flowFile); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + + StreamUtils.copy(in, baos); + contents = new String(baos.toByteArray(), StandardCharsets.UTF_8); + } catch (final Exception e) { + throw new ProcessException(e); + } + + for (final Map.Entry entry : context.getProperties().entrySet()) { + final String propertyName = entry.getKey().getName(); + if (contents.equals(entry.getValue())) { + getLogger().info("Routing {} to {}", flowFile, propertyName); + session.transfer(flowFile, new Relationship.Builder().name(propertyName).build()); + return; + } + } + + getLogger().info("Routing {} to {}", flowFile, REL_UNMATCHED); + session.transfer(flowFile, REL_UNMATCHED); + } +} diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index c6588f63e4..c0b025c3fa 100644 --- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -26,7 +26,9 @@ org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading org.apache.nifi.processors.tests.system.ReverseContents org.apache.nifi.processors.tests.system.SetAttribute org.apache.nifi.processors.tests.system.Sleep +org.apache.nifi.processors.tests.system.SplitByLine org.apache.nifi.processors.tests.system.TerminateFlowFile org.apache.nifi.processors.tests.system.ThrowProcessException org.apache.nifi.processors.tests.system.ValidateFileExists +org.apache.nifi.processors.tests.system.VerifyContents org.apache.nifi.processors.tests.system.WriteToFile \ No newline at end of file diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 7616a29e00..feb7bda40c 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -24,6 +24,7 @@ import org.apache.nifi.controller.queue.QueueSize; import org.apache.nifi.provenance.search.SearchableField; import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol; import org.apache.nifi.scheduling.ExecutionNode; +import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; @@ -82,8 +83,10 @@ import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -261,6 +264,12 @@ public class NiFiClientUtil { return updateProcessorConfig(currentEntity, config); } + public ProcessorEntity updateProcessorRunDuration(final ProcessorEntity currentEntity, final int runDuration) throws NiFiClientException, IOException { + final ProcessorConfigDTO config = new ProcessorConfigDTO(); + config.setRunDurationMillis((long) runDuration); + return updateProcessorConfig(currentEntity, config); + } + public ProcessorEntity updateProcessorSchedulingPeriod(final ProcessorEntity currentEntity, final String schedulingPeriod) throws NiFiClientException, IOException { final ProcessorConfigDTO config = new ProcessorConfigDTO(); config.setSchedulingPeriod(schedulingPeriod); @@ -776,6 +785,20 @@ public class NiFiClientUtil { return flowFileEntity; } + public String getFlowFileContentAsUtf8(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException { + final byte[] contents = getFlowFileContentAsByteArray(connectionId, flowFileIndex); + return new String(contents, StandardCharsets.UTF_8); + } + + public byte[] getFlowFileContentAsByteArray(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException { + try (final InputStream in = getFlowFileContent(connectionId, flowFileIndex); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + + StreamUtils.copy(in, baos); + return baos.toByteArray(); + } + } + public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException { final ListingRequestEntity listing = performQueueListing(connectionId); final List flowFileSummaries = listing.getListingRequest().getFlowFileSummaries(); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java index 3d2507f928..c47523faf2 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/processor/RunOnceIT.java @@ -30,7 +30,6 @@ public class RunOnceIT extends NiFiSystemIT { @Test public void testRunOnce() throws NiFiClientException, IOException, InterruptedException { - // GIVEN ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec"); @@ -38,10 +37,8 @@ public class RunOnceIT extends NiFiSystemIT { ConnectionEntity generateToTerminate = getClientUtil().createConnection(generate, terminate, "success"); - // WHEN getNifiClient().getProcessorClient().runProcessorOnce(generate); - // THEN waitForQueueCount(generateToTerminate.getId(), 1); ProcessorEntity actualGenerate = getNifiClient().getProcessorClient().getProcessor(generate.getId()); diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java new file mode 100644 index 0000000000..1a3790fcbb --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.repositories; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +/** + * This test is intended to verify that Processors are able to access the content that their FlowFiles represent in several different situations. + * We test for things like splitting a FlowFile by creating multiple children and writing to them, as well as creating children via processSession.clone(FlowFile flowFile, long offset, long length); + * We also test against Run Duration of 0 ms vs. 25 milliseconds in order to test when a processor writes the contents to multiple FlowFiles in the same session (which will result in writing to the + * same Content Claim) as well as writing to multiple FlowFiles in multiple sessions (which may result in writing to multiple Content Claims). + */ +public class ContentAccessIT extends NiFiSystemIT { + + @Test + public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndWrite() throws NiFiClientException, IOException, InterruptedException { + testCorrectContentReadWhenMultipleFlowFilesInClaim(true, false); + } + + @Test + public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndWrite() throws NiFiClientException, IOException, InterruptedException { + testCorrectContentReadWhenMultipleFlowFilesInClaim(false, false); + } + + @Test + public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndClone() throws NiFiClientException, IOException, InterruptedException { + testCorrectContentReadWhenMultipleFlowFilesInClaim(true, true); + } + + @Test + public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndClone() throws NiFiClientException, IOException, InterruptedException { + testCorrectContentReadWhenMultipleFlowFilesInClaim(false, true); + } + + public void testCorrectContentReadWhenMultipleFlowFilesInClaim(final boolean useBatch, final boolean clone) throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity split = getClientUtil().createProcessor("SplitByLine"); + final ProcessorEntity reverse = getClientUtil().createProcessor("ReverseContents"); + final ProcessorEntity verify = getClientUtil().createProcessor("VerifyContents"); + final ProcessorEntity terminateAa = getClientUtil().createProcessor("TerminateFlowFile"); + final ProcessorEntity terminateBa = getClientUtil().createProcessor("TerminateFlowFile"); + final ProcessorEntity terminateCa = getClientUtil().createProcessor("TerminateFlowFile"); + final ProcessorEntity terminateUnmatched = getClientUtil().createProcessor("TerminateFlowFile"); + + // Configure Generate + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins"); + getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("Text", "{ a : a }\n{ a : b }\n{ a : c }")); + + // Configure split + getClientUtil().updateProcessorProperties(split, Collections.singletonMap("Use Clone", String.valueOf(clone))); + + // Configure Verify + final Map verifyProperties = new HashMap<>(); + verifyProperties.put("aa", "} a : a {"); + verifyProperties.put("ba", "} b : a {"); + verifyProperties.put("ca", "} c : a {"); + getClientUtil().updateProcessorProperties(verify, verifyProperties); + + // Configure batching for reverse + final int runDuration = useBatch ? 25 : 0; + getClientUtil().updateProcessorRunDuration(reverse, runDuration); + + final ConnectionEntity generateToSplit = getClientUtil().createConnection(generate, split, "success"); + final ConnectionEntity splitToReverse = getClientUtil().createConnection(split, reverse, "success"); + final ConnectionEntity reverseToVerify = getClientUtil().createConnection(reverse, verify, "success"); + final ConnectionEntity verifyToTerminateAa = getClientUtil().createConnection(verify, terminateAa, "aa"); + final ConnectionEntity verifyToTerminateBa = getClientUtil().createConnection(verify, terminateBa, "ba"); + final ConnectionEntity verifyToTerminateCa = getClientUtil().createConnection(verify, terminateCa, "ca"); + final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched"); + + // Run Generate processor, wait for its output + getNifiClient().getProcessorClient().startProcessor(generate); + waitForQueueCount(generateToSplit.getId(), 1); + + // Run split processor, wait for its output + getNifiClient().getProcessorClient().startProcessor(split); + waitForQueueCount(splitToReverse.getId(), 3); + + // Verify output of the Split processor + final String firstSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 0); + final String secondSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 1); + final String thirdSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 2); + + // Verify that we get both expected outputs. We put them in a set and ensure that the set contains both because we don't know the order + // that they will be in. The reason we don't know the order is because if we are using batching, the contents will be in the same output + // Content Claim, otherwise they won't be. If they are not, the order can change. + final Set splitContents = new HashSet<>(); + splitContents.add(firstSplitContents); + splitContents.add(secondSplitContents); + splitContents.add(thirdSplitContents); + + assertTrue(splitContents.contains("{ a : a }")); + assertTrue(splitContents.contains("{ a : b }")); + assertTrue(splitContents.contains("{ a : c }")); + + // Start the reverse processor, wait for its output + getNifiClient().getProcessorClient().startProcessor(reverse); + waitForQueueCount(reverseToVerify.getId(), 3); + + final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0); + final String secondReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 1); + final String thirdReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 2); + + final Set reversedContents = new HashSet<>(); + reversedContents.add(firstReversedContents); + reversedContents.add(secondReversedContents); + reversedContents.add(thirdReversedContents); + + assertTrue(reversedContents.contains("} a : a {")); + assertTrue(reversedContents.contains("} b : a {")); + assertTrue(reversedContents.contains("} c : a {")); + + // Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use + // of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents. + getNifiClient().getProcessorClient().startProcessor(verify); + + waitForQueueCount(verifyToTerminateAa.getId(), 1); + waitForQueueCount(verifyToTerminateBa.getId(), 1); + waitForQueueCount(verifyToTerminateCa.getId(), 1); + waitForQueueCount(verifyToTerminateUnmatched.getId(), 0); + } +}