diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java index 43e1e4b89a..16209c8ef7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,6 +49,7 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -84,6 +86,11 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils; @WritesAttributes({ @WritesAttribute(attribute="record.count", description="The number of records in an outgoing FlowFile"), @WritesAttribute(attribute="mime.type", description="The MIME Type that the configured Record Writer indicates is appropriate"), + @WritesAttribute(attribute = "fragment.identifier", description = "All partitioned FlowFiles produced from the same parent FlowFile will have the same randomly " + + "generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", description = "The number of partitioned FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile"), @WritesAttribute(attribute="", description = "For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information.") }) @@ -232,6 +239,8 @@ public class PartitionRecord extends AbstractProcessor { } // For each RecordSetWriter, finish the record set and close the writer. + int fragmentIndex = 0; + final String fragmentId = UUID.randomUUID().toString(); for (final Map.Entry entry : writerMap.entrySet()) { final RecordValueMap valueMap = entry.getKey(); final RecordSetWriter writer = entry.getValue(); @@ -244,11 +253,16 @@ public class PartitionRecord extends AbstractProcessor { attributes.putAll(writeResult.getAttributes()); attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.put(FragmentAttributes.FRAGMENT_INDEX.key(), String.valueOf(fragmentIndex)); + attributes.put(FragmentAttributes.FRAGMENT_ID.key(), fragmentId); + attributes.put(FragmentAttributes.FRAGMENT_COUNT.key(), String.valueOf(writerMap.size())); + attributes.put(FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(), flowFile.getAttribute(CoreAttributes.FILENAME.key())); FlowFile childFlowFile = valueMap.getFlowFile(); childFlowFile = session.putAllAttributes(childFlowFile, attributes); session.adjustCounter("Record Processed", writeResult.getRecordCount(), false); + fragmentIndex++; } } catch (final Exception e) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java index 99e63705a5..32546c5e62 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPartitionRecord.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; +import java.util.stream.IntStream; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordParser; @@ -80,6 +81,9 @@ public class TestPartitionRecord { assertEquals(3L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("1")).count()); assertEquals(1L, out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).count()); + out.forEach(ff -> ff.assertAttributeEquals("fragment.count", "4")); + IntStream.of(1, 3).forEach((i) -> out.get(i).assertAttributeEquals("fragment.id", out.get(0).getAttribute("fragment.id"))); + IntStream.of(0, 3).forEach((i) -> out.get(i).assertAttributeEquals("fragment.index", String.valueOf(i))); out.stream().filter(ff -> ff.getAttribute("record.count").equals("2")).forEach(ff -> ff.assertContentEquals("Jake,49,\nJake,14,\n"));