NIFI-11914 Support Expression Language for SegmentContent Size

This closes #7578

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Mark Bean 2023-08-07 17:46:08 -04:00 committed by exceptionfactory
parent 2e6eaaeb38
commit 02653143d8
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 34 additions and 1 deletions

View File

@ -36,6 +36,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
@ -74,9 +75,11 @@ public class SegmentContent extends AbstractProcessor {
public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder()
.name("Segment Size")
.displayName("Segment Size")
.description("The maximum data size in bytes for each segment")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final Relationship REL_SEGMENTS = new Relationship.Builder()
@ -122,7 +125,7 @@ public class SegmentContent extends AbstractProcessor {
}
final String segmentId = UUID.randomUUID().toString();
final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue();
final long segmentSize = context.getProperty(SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue();
final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());

View File

@ -22,7 +22,9 @@ import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ -70,4 +72,32 @@ public class TestSegmentContent {
final MockFlowFile out1 = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS).get(0);
out1.assertContentEquals(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9});
}
@Test
public void testExpressionLanguage() throws IOException {
final TestRunner testRunner = TestRunners.newTestRunner(new SegmentContent());
Map<String, String> attributes = new HashMap<>();
attributes.put("segmentSize", "4 B");
testRunner.setProperty(SegmentContent.SIZE, "${segmentSize}");
testRunner.assertValid();
testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}, attributes);
testRunner.run();
testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1);
final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0);
originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID);
originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "3");
final List<MockFlowFile> flowFiles = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS);
assertEquals(3, flowFiles.size());
final MockFlowFile out1 = flowFiles.get(0);
final MockFlowFile out2 = flowFiles.get(1);
final MockFlowFile out3 = flowFiles.get(2);
out1.assertContentEquals(new byte[]{1, 2, 3, 4});
out2.assertContentEquals(new byte[]{5, 6, 7, 8});
out3.assertContentEquals(new byte[]{9});
}
}