NIFI-8337: This closes #4910. Fixed bug in StandardProcessSession where the session didn't account for FlowFile's contentClaimOffset when seeking to the appropriate location in the stream.

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2021-03-17 13:30:38 -04:00 committed by Joe Witt
parent e16cc9df46
commit cf4e966d91
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
8 changed files with 407 additions and 5 deletions

View File

@ -2300,9 +2300,9 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
currentReadClaim = claim.getResourceClaim();
final InputStream contentRepoStream = context.getContentRepository().read(claim.getResourceClaim());
StreamUtils.skip(contentRepoStream, claim.getOffset());
StreamUtils.skip(contentRepoStream, claim.getOffset() + contentClaimOffset);
final InputStream bufferedContentStream = new BufferedInputStream(contentRepoStream);
final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset());
final ByteCountingInputStream byteCountingInputStream = new ByteCountingInputStream(bufferedContentStream, claim.getOffset() + contentClaimOffset);
currentReadClaimStream = byteCountingInputStream;
// Use a non-closeable stream (DisableOnCloseInputStream) because we want to keep it open after the callback has finished so that we can

View File

@ -17,6 +17,7 @@
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -31,6 +32,7 @@ import java.io.OutputStream;
import java.util.Collections;
import java.util.Set;
@SupportsBatching
public class ReverseContents extends AbstractProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.util.TextLineDemarcator;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class SplitByLine extends AbstractProcessor {
static final PropertyDescriptor USE_CLONE = new PropertyDescriptor.Builder()
.name("Use Clone")
.description("Whether or not to use session.clone for generating children FlowFiles")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.build();
@Override
public Set<Relationship> getRelationships() {
return Collections.singleton(REL_SUCCESS);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(USE_CLONE);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final boolean clone = context.getProperty(USE_CLONE).asBoolean();
if (clone) {
splitByClone(session, flowFile);
} else {
splitByWrite(session, flowFile);
}
session.remove(flowFile);
}
private void splitByClone(ProcessSession session, FlowFile flowFile) {
final List<TextLineDemarcator.OffsetInfo> offsetInfos = new ArrayList<>();
try (final InputStream in = session.read(flowFile);
final TextLineDemarcator demarcator = new TextLineDemarcator(in)) {
TextLineDemarcator.OffsetInfo offsetInfo;
while ((offsetInfo = demarcator.nextOffsetInfo()) != null) {
offsetInfos.add(offsetInfo);
}
} catch (final Exception e) {
throw new ProcessException(e);
}
for (final TextLineDemarcator.OffsetInfo offsetInfo : offsetInfos) {
FlowFile child = session.clone(flowFile, offsetInfo.getStartOffset(), offsetInfo.getLength() - offsetInfo.getCrlfLength());
session.putAttribute(child, "num.lines", String.valueOf(offsetInfos.size()));
session.transfer(child, REL_SUCCESS);
}
}
private void splitByWrite(ProcessSession session, FlowFile flowFile) {
final List<FlowFile> children = new ArrayList<>();
try (final InputStream in = session.read(flowFile);
final BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {
String line;
while ((line = reader.readLine()) != null) {
FlowFile child = session.create(flowFile);
children.add(child);
try (final OutputStream out = session.write(child)) {
final byte[] lineBytes = line.getBytes(StandardCharsets.UTF_8);
out.write(lineBytes);
}
}
} catch (final Exception e) {
throw new ProcessException(e);
}
for (FlowFile child : children) {
session.putAttribute(child, "num.lines", String.valueOf(children.size()));
}
session.transfer(children, REL_SUCCESS);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.tests.system;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
public class VerifyContents extends AbstractProcessor {
private static final Relationship REL_UNMATCHED = new Relationship.Builder()
.name("unmatched")
.build();
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>(Collections.singleton(REL_UNMATCHED));
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.dynamic(true)
.addValidator(Validator.VALID)
.build();
}
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
final Relationship relationship = new Relationship.Builder()
.name(descriptor.getName())
.build();
final Set<Relationship> updatedRelationships = new HashSet<>(relationshipsRef.get());
if (newValue == null) {
updatedRelationships.remove(relationship);
} else {
updatedRelationships.add(relationship);
}
updatedRelationships.add(REL_UNMATCHED); // Ensure that the unmatched relationship is always available
relationshipsRef.set(updatedRelationships);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final String contents;
try (final InputStream in = session.read(flowFile);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
StreamUtils.copy(in, baos);
contents = new String(baos.toByteArray(), StandardCharsets.UTF_8);
} catch (final Exception e) {
throw new ProcessException(e);
}
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
final String propertyName = entry.getKey().getName();
if (contents.equals(entry.getValue())) {
getLogger().info("Routing {} to {}", flowFile, propertyName);
session.transfer(flowFile, new Relationship.Builder().name(propertyName).build());
return;
}
}
getLogger().info("Routing {} to {}", flowFile, REL_UNMATCHED);
session.transfer(flowFile, REL_UNMATCHED);
}
}

View File

@ -26,7 +26,9 @@ org.apache.nifi.processors.tests.system.PassThroughRequiresInstanceClassLoading
org.apache.nifi.processors.tests.system.ReverseContents
org.apache.nifi.processors.tests.system.SetAttribute
org.apache.nifi.processors.tests.system.Sleep
org.apache.nifi.processors.tests.system.SplitByLine
org.apache.nifi.processors.tests.system.TerminateFlowFile
org.apache.nifi.processors.tests.system.ThrowProcessException
org.apache.nifi.processors.tests.system.ValidateFileExists
org.apache.nifi.processors.tests.system.VerifyContents
org.apache.nifi.processors.tests.system.WriteToFile

View File

@ -24,6 +24,7 @@ import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.protocol.SiteToSiteTransportProtocol;
import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.toolkit.cli.impl.client.nifi.ConnectionClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
@ -82,8 +83,10 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@ -261,6 +264,12 @@ public class NiFiClientUtil {
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorRunDuration(final ProcessorEntity currentEntity, final int runDuration) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setRunDurationMillis((long) runDuration);
return updateProcessorConfig(currentEntity, config);
}
public ProcessorEntity updateProcessorSchedulingPeriod(final ProcessorEntity currentEntity, final String schedulingPeriod) throws NiFiClientException, IOException {
final ProcessorConfigDTO config = new ProcessorConfigDTO();
config.setSchedulingPeriod(schedulingPeriod);
@ -776,6 +785,20 @@ public class NiFiClientUtil {
return flowFileEntity;
}
public String getFlowFileContentAsUtf8(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
final byte[] contents = getFlowFileContentAsByteArray(connectionId, flowFileIndex);
return new String(contents, StandardCharsets.UTF_8);
}
public byte[] getFlowFileContentAsByteArray(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
try (final InputStream in = getFlowFileContent(connectionId, flowFileIndex);
final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
StreamUtils.copy(in, baos);
return baos.toByteArray();
}
}
public InputStream getFlowFileContent(final String connectionId, final int flowFileIndex) throws NiFiClientException, IOException {
final ListingRequestEntity listing = performQueueListing(connectionId);
final List<FlowFileSummaryDTO> flowFileSummaries = listing.getListingRequest().getFlowFileSummaries();

View File

@ -30,7 +30,6 @@ public class RunOnceIT extends NiFiSystemIT {
@Test
public void testRunOnce() throws NiFiClientException, IOException, InterruptedException {
// GIVEN
ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
getClientUtil().updateProcessorSchedulingPeriod(generate, "1 sec");
@ -38,10 +37,8 @@ public class RunOnceIT extends NiFiSystemIT {
ConnectionEntity generateToTerminate = getClientUtil().createConnection(generate, terminate, "success");
// WHEN
getNifiClient().getProcessorClient().runProcessorOnce(generate);
// THEN
waitForQueueCount(generateToTerminate.getId(), 1);
ProcessorEntity actualGenerate = getNifiClient().getProcessorClient().getProcessor(generate.getId());

View File

@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.tests.system.repositories;
import org.apache.nifi.tests.system.NiFiSystemIT;
import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
import org.apache.nifi.web.api.entity.ConnectionEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertTrue;
/**
* This test is intended to verify that Processors are able to access the content that their FlowFiles represent in several different situations.
* We test for things like splitting a FlowFile by creating multiple children and writing to them, as well as creating children via processSession.clone(FlowFile flowFile, long offset, long length);
* We also test against Run Duration of 0 ms vs. 25 milliseconds in order to test when a processor writes the contents to multiple FlowFiles in the same session (which will result in writing to the
* same Content Claim) as well as writing to multiple FlowFiles in multiple sessions (which may result in writing to multiple Content Claims).
*/
public class ContentAccessIT extends NiFiSystemIT {
@Test
public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndWrite() throws NiFiClientException, IOException, InterruptedException {
testCorrectContentReadWhenMultipleFlowFilesInClaim(true, false);
}
@Test
public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndWrite() throws NiFiClientException, IOException, InterruptedException {
testCorrectContentReadWhenMultipleFlowFilesInClaim(false, false);
}
@Test
public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithBatchAndClone() throws NiFiClientException, IOException, InterruptedException {
testCorrectContentReadWhenMultipleFlowFilesInClaim(true, true);
}
@Test
public void testCorrectContentReadWhenMultipleFlowFilesInClaimWithoutBatchAndClone() throws NiFiClientException, IOException, InterruptedException {
testCorrectContentReadWhenMultipleFlowFilesInClaim(false, true);
}
public void testCorrectContentReadWhenMultipleFlowFilesInClaim(final boolean useBatch, final boolean clone) throws NiFiClientException, IOException, InterruptedException {
final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
final ProcessorEntity split = getClientUtil().createProcessor("SplitByLine");
final ProcessorEntity reverse = getClientUtil().createProcessor("ReverseContents");
final ProcessorEntity verify = getClientUtil().createProcessor("VerifyContents");
final ProcessorEntity terminateAa = getClientUtil().createProcessor("TerminateFlowFile");
final ProcessorEntity terminateBa = getClientUtil().createProcessor("TerminateFlowFile");
final ProcessorEntity terminateCa = getClientUtil().createProcessor("TerminateFlowFile");
final ProcessorEntity terminateUnmatched = getClientUtil().createProcessor("TerminateFlowFile");
// Configure Generate
getClientUtil().updateProcessorSchedulingPeriod(generate, "10 mins");
getClientUtil().updateProcessorProperties(generate, Collections.singletonMap("Text", "{ a : a }\n{ a : b }\n{ a : c }"));
// Configure split
getClientUtil().updateProcessorProperties(split, Collections.singletonMap("Use Clone", String.valueOf(clone)));
// Configure Verify
final Map<String, String> verifyProperties = new HashMap<>();
verifyProperties.put("aa", "} a : a {");
verifyProperties.put("ba", "} b : a {");
verifyProperties.put("ca", "} c : a {");
getClientUtil().updateProcessorProperties(verify, verifyProperties);
// Configure batching for reverse
final int runDuration = useBatch ? 25 : 0;
getClientUtil().updateProcessorRunDuration(reverse, runDuration);
final ConnectionEntity generateToSplit = getClientUtil().createConnection(generate, split, "success");
final ConnectionEntity splitToReverse = getClientUtil().createConnection(split, reverse, "success");
final ConnectionEntity reverseToVerify = getClientUtil().createConnection(reverse, verify, "success");
final ConnectionEntity verifyToTerminateAa = getClientUtil().createConnection(verify, terminateAa, "aa");
final ConnectionEntity verifyToTerminateBa = getClientUtil().createConnection(verify, terminateBa, "ba");
final ConnectionEntity verifyToTerminateCa = getClientUtil().createConnection(verify, terminateCa, "ca");
final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched");
// Run Generate processor, wait for its output
getNifiClient().getProcessorClient().startProcessor(generate);
waitForQueueCount(generateToSplit.getId(), 1);
// Run split processor, wait for its output
getNifiClient().getProcessorClient().startProcessor(split);
waitForQueueCount(splitToReverse.getId(), 3);
// Verify output of the Split processor
final String firstSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 0);
final String secondSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 1);
final String thirdSplitContents = getClientUtil().getFlowFileContentAsUtf8(splitToReverse.getId(), 2);
// Verify that we get both expected outputs. We put them in a set and ensure that the set contains both because we don't know the order
// that they will be in. The reason we don't know the order is because if we are using batching, the contents will be in the same output
// Content Claim, otherwise they won't be. If they are not, the order can change.
final Set<String> splitContents = new HashSet<>();
splitContents.add(firstSplitContents);
splitContents.add(secondSplitContents);
splitContents.add(thirdSplitContents);
assertTrue(splitContents.contains("{ a : a }"));
assertTrue(splitContents.contains("{ a : b }"));
assertTrue(splitContents.contains("{ a : c }"));
// Start the reverse processor, wait for its output
getNifiClient().getProcessorClient().startProcessor(reverse);
waitForQueueCount(reverseToVerify.getId(), 3);
final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
final String secondReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 1);
final String thirdReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 2);
final Set<String> reversedContents = new HashSet<>();
reversedContents.add(firstReversedContents);
reversedContents.add(secondReversedContents);
reversedContents.add(thirdReversedContents);
assertTrue(reversedContents.contains("} a : a {"));
assertTrue(reversedContents.contains("} b : a {"));
assertTrue(reversedContents.contains("} c : a {"));
// Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use
// of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents.
getNifiClient().getProcessorClient().startProcessor(verify);
waitForQueueCount(verifyToTerminateAa.getId(), 1);
waitForQueueCount(verifyToTerminateBa.getId(), 1);
waitForQueueCount(verifyToTerminateCa.getId(), 1);
waitForQueueCount(verifyToTerminateUnmatched.getId(), 0);
}
}