diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java index 0049d4cb7a..eed82ce6fd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java @@ -36,6 +36,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.flowfile.attributes.FragmentAttributes; @@ -74,9 +75,11 @@ public class SegmentContent extends AbstractProcessor { public static final PropertyDescriptor SIZE = new PropertyDescriptor.Builder() .name("Segment Size") + .displayName("Segment Size") .description("The maximum data size in bytes for each segment") .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .build(); public static final Relationship REL_SEGMENTS = new Relationship.Builder() @@ -122,7 +125,7 @@ public class SegmentContent extends AbstractProcessor { } final String segmentId = UUID.randomUUID().toString(); - final long segmentSize = context.getProperty(SIZE).asDataSize(DataUnit.B).longValue(); + final long segmentSize = context.getProperty(SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue(); final String originalFileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java index 178b43ca82..720d0c65e1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSegmentContent.java @@ -22,7 +22,9 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -70,4 +72,32 @@ public class TestSegmentContent { final MockFlowFile out1 = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS).get(0); out1.assertContentEquals(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}); } + + @Test + public void testExpressionLanguage() throws IOException { + final TestRunner testRunner = TestRunners.newTestRunner(new SegmentContent()); + Map attributes = new HashMap<>(); + attributes.put("segmentSize", "4 B"); + testRunner.setProperty(SegmentContent.SIZE, "${segmentSize}"); + testRunner.assertValid(); + + testRunner.enqueue(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}, attributes); + testRunner.run(); + + testRunner.assertTransferCount(SegmentContent.REL_ORIGINAL, 1); + final MockFlowFile originalFlowFile = testRunner.getFlowFilesForRelationship(SegmentContent.REL_ORIGINAL).get(0); + originalFlowFile.assertAttributeExists(SegmentContent.FRAGMENT_ID); + originalFlowFile.assertAttributeEquals(SegmentContent.FRAGMENT_COUNT, "3"); + + final List flowFiles = testRunner.getFlowFilesForRelationship(SegmentContent.REL_SEGMENTS); + assertEquals(3, flowFiles.size()); + + final MockFlowFile out1 = flowFiles.get(0); + final MockFlowFile out2 = flowFiles.get(1); + final MockFlowFile out3 = flowFiles.get(2); + + out1.assertContentEquals(new byte[]{1, 2, 3, 4}); + out2.assertContentEquals(new byte[]{5, 6, 7, 8}); + out3.assertContentEquals(new byte[]{9}); + } }