From 49297b725d4770040c2cc2c01514ef49b2680c88 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 22 Sep 2016 11:00:11 -0400 Subject: [PATCH] NIFI-2805: Add fragment attributes to SplitAvro This closes #1046. --- .../nifi/processors/avro/SplitAvro.java | 26 ++++++++++++++-- .../nifi/processors/avro/TestSplitAvro.java | 31 ++++++++++++++----- 2 files changed, 47 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java index ac6936f1d4..e3eb6ec577 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java @@ -27,7 +27,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileConstants; @@ -43,11 +45,14 @@ import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -65,6 +70,15 @@ import org.apache.nifi.stream.io.BufferedOutputStream; @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " + "the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.") +@WritesAttributes({ + @WritesAttribute(attribute = "fragment.identifier", + description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"), + @WritesAttribute(attribute = "fragment.index", + description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"), + @WritesAttribute(attribute = "fragment.count", + description = "The number of split FlowFiles generated from the parent FlowFile"), + @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile") +}) public class SplitAvro extends AbstractProcessor { public static final String RECORD_SPLIT_VALUE = "Record"; @@ -200,10 +214,18 @@ public class SplitAvro extends AbstractProcessor { try { final List splits = splitter.split(session, flowFile, splitWriter); - session.transfer(splits, REL_SPLIT); + final String fragmentIdentifier = UUID.randomUUID().toString(); + IntStream.range(0, splits.size()).forEach((i) -> { + FlowFile split = splits.get(i); + split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier); + split = session.putAttribute(split, "fragment.index", Integer.toString(i)); + split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key())); + split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size())); + session.transfer(split, REL_SPLIT); + }); session.transfer(flowFile, REL_ORIGINAL); } catch (ProcessException e) { - getLogger().error("Failed to split {} due to {}", new Object[] {flowFile, e.getMessage()}, e); + getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e); session.transfer(flowFile, REL_FAILURE); } } diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java index 73da818523..32d43e306b 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java @@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; +import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; @@ -39,7 +40,11 @@ import java.io.EOFException; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; public class TestSplitAvro { @@ -102,15 +107,25 @@ public class TestSplitAvro { public void testRecordSplitDatafileOutputWithSingleRecords() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); - runner.enqueue(users.toByteArray()); + final String filename = "users.avro"; + runner.enqueue(users.toByteArray(), new HashMap() {{ + put(CoreAttributes.FILENAME.key(), filename); + }}); runner.run(); runner.assertTransferCount(SplitAvro.REL_SPLIT, 100); runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); - final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); checkDataFileSplitSize(flowFiles, 1, true); + final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier"); + IntStream.range(0, flowFiles.size()).forEach((i) -> { + MockFlowFile flowFile = flowFiles.get(i); + assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index"))); + assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier")); + assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count"))); + assertEquals(filename, flowFile.getAttribute("segment.original.filename")); + }); } @Test @@ -261,7 +276,7 @@ public class TestSplitAvro { } catch (EOFException eof) { // expected } - Assert.assertEquals(expectedRecordsPerSplit, count); + assertEquals(expectedRecordsPerSplit, count); } if (checkMetadata) { @@ -285,12 +300,12 @@ public class TestSplitAvro { Assert.assertNotNull(record.get("favorite_number")); count++; } - Assert.assertEquals(expectedRecordsPerSplit, count); + assertEquals(expectedRecordsPerSplit, count); if (checkMetadata) { - Assert.assertEquals(META_VALUE1, reader.getMetaString(META_KEY1)); - Assert.assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2)); - Assert.assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8")); + assertEquals(META_VALUE1, reader.getMetaString(META_KEY1)); + assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2)); + assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8")); } } } @@ -311,7 +326,7 @@ public class TestSplitAvro { } } } - Assert.assertEquals(expectedTotalRecords, count); + assertEquals(expectedTotalRecords, count); } }