From 7c08fbc4d495bf592ef5993e52c1e5aed20a450b Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 13 May 2021 11:32:35 -0400 Subject: [PATCH] NIFI-8542: When returning content via TriggerResult.readContent(FlowFile), ensure that we take into account the content claim offset and length Signed-off-by: Matthew Burgess This closes #5076 --- .../engine/StandardExecutionProgress.java | 12 ++- .../nifi/stateless/StatelessSystemIT.java | 1 + .../nifi/stateless/basics/SplittingIT.java | 90 +++++++++++++++++++ 3 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/SplittingIT.java diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java index fd18e84837..1e42d383e3 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/engine/StandardExecutionProgress.java @@ -30,6 +30,7 @@ import org.apache.nifi.stateless.queue.DrainableFlowFileQueue; import org.apache.nifi.stateless.repository.ByteArrayContentRepository; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -143,7 +144,16 @@ public class StandardExecutionProgress implements ExecutionProgress { final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile; final ContentClaim contentClaim = flowFileRecord.getContentClaim(); - return contentRepository.getBytes(contentClaim); + final byte[] contentClaimContents = contentRepository.getBytes(contentClaim); + final long offset = flowFileRecord.getContentClaimOffset(); + final long size = flowFileRecord.getSize(); + + if (offset == 0 && size == contentClaimContents.length) { + return contentClaimContents; + } + + final byte[] flowFileContents = Arrays.copyOfRange(contentClaimContents, (int) offset, (int) (size + offset)); + return flowFileContents; } @Override diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java index 268061b80f..e805fb5527 100644 --- a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/StatelessSystemIT.java @@ -160,6 +160,7 @@ public class StatelessSystemIT { final StatelessBootstrap bootstrap = StatelessBootstrap.bootstrap(getEngineConfiguration()); final StatelessDataflow dataflow = bootstrap.createDataflow(dataflowDefinition, Collections.emptyList()); + dataflow.initialize(); createdFlows.add(dataflow); return dataflow; diff --git a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/SplittingIT.java b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/SplittingIT.java new file mode 100644 index 0000000000..c6699e408c --- /dev/null +++ b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/SplittingIT.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.stateless.basics; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.registry.flow.VersionedPort; +import org.apache.nifi.registry.flow.VersionedProcessor; +import org.apache.nifi.stateless.StatelessSystemIT; +import org.apache.nifi.stateless.VersionedFlowBuilder; +import org.apache.nifi.stateless.config.StatelessConfigurationException; +import org.apache.nifi.stateless.flow.DataflowTrigger; +import org.apache.nifi.stateless.flow.StatelessDataflow; +import org.apache.nifi.stateless.flow.TriggerResult; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SplittingIT extends StatelessSystemIT { + @Test + public void testSplitByWriting() throws IOException, StatelessConfigurationException, InterruptedException { + testSplit(false); + } + + @Test + public void testSplitByClone() throws IOException, StatelessConfigurationException, InterruptedException { + testSplit(true); + } + + private void testSplit(final boolean useClone) throws IOException, StatelessConfigurationException, InterruptedException { + // Build the flow + final VersionedFlowBuilder flowBuilder = new VersionedFlowBuilder(); + final VersionedPort outPort = flowBuilder.createOutputPort("Out"); + + final VersionedProcessor generate = flowBuilder.createSimpleProcessor("GenerateFlowFile"); + final Map generateProperties = new HashMap<>(); + generateProperties.put("Text", "abc\n123\nxyz\n321"); + generateProperties.put("Batch Size", "1"); + generate.setProperties(generateProperties); + + final VersionedProcessor split = flowBuilder.createSimpleProcessor("SplitByLine"); + final Map splitProperties = Collections.singletonMap("Use Clone", String.valueOf(useClone)); + split.setProperties(splitProperties); + + flowBuilder.createConnection(generate, split, "success"); + flowBuilder.createConnection(split, outPort, "success"); + + // Startup the dataflow + final StatelessDataflow dataflow = loadDataflow(flowBuilder.getFlowSnapshot(), Collections.emptyList()); + + // Enqueue data and trigger + final DataflowTrigger trigger = dataflow.trigger(); + final TriggerResult result = trigger.getResult(); + assertTrue(result.isSuccessful()); + result.acknowledge(); + + final List flowFiles = result.getOutputFlowFiles("Out"); + assertEquals(4, flowFiles.size()); + + final String[] expectedContent = new String[] {"abc", "123", "xyz", "321"}; + for (int i=0; i < expectedContent.length; i++) { + final String expected = expectedContent[i]; + + final FlowFile flowFile = flowFiles.get(i); + final String outputContent = new String(result.readContent(flowFile)); + assertEquals(expected, outputContent); + } + } +}