NIFI-2805: Add fragment attributes to SplitAvro

This closes #1046.
This commit is contained in:
Matt Burgess 2016-09-22 11:00:11 -04:00 committed by Pierre Villard
parent 937dc71aec
commit 49297b725d
2 changed files with 47 additions and 10 deletions

View File

@ -27,7 +27,9 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import org.apache.avro.file.CodecFactory; import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants; import org.apache.avro.file.DataFileConstants;
@ -43,11 +45,14 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
@ -65,6 +70,15 @@ import org.apache.nifi.stream.io.BufferedOutputStream;
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " + @CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " +
"the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.") "the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.")
@WritesAttributes({
@WritesAttribute(attribute = "fragment.identifier",
description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index",
description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
@WritesAttribute(attribute = "fragment.count",
description = "The number of split FlowFiles generated from the parent FlowFile"),
@WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
})
public class SplitAvro extends AbstractProcessor { public class SplitAvro extends AbstractProcessor {
public static final String RECORD_SPLIT_VALUE = "Record"; public static final String RECORD_SPLIT_VALUE = "Record";
@ -200,10 +214,18 @@ public class SplitAvro extends AbstractProcessor {
try { try {
final List<FlowFile> splits = splitter.split(session, flowFile, splitWriter); final List<FlowFile> splits = splitter.split(session, flowFile, splitWriter);
session.transfer(splits, REL_SPLIT); final String fragmentIdentifier = UUID.randomUUID().toString();
IntStream.range(0, splits.size()).forEach((i) -> {
FlowFile split = splits.get(i);
split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
split = session.putAttribute(split, "fragment.index", Integer.toString(i));
split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size()));
session.transfer(split, REL_SPLIT);
});
session.transfer(flowFile, REL_ORIGINAL); session.transfer(flowFile, REL_ORIGINAL);
} catch (ProcessException e) { } catch (ProcessException e) {
getLogger().error("Failed to split {} due to {}", new Object[] {flowFile, e.getMessage()}, e); getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
} }
} }

View File

@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder; import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
@ -39,7 +40,11 @@ import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
public class TestSplitAvro { public class TestSplitAvro {
@ -102,15 +107,25 @@ public class TestSplitAvro {
public void testRecordSplitDatafileOutputWithSingleRecords() throws IOException { public void testRecordSplitDatafileOutputWithSingleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.enqueue(users.toByteArray()); final String filename = "users.avro";
runner.enqueue(users.toByteArray(), new HashMap<String,String>() {{
put(CoreAttributes.FILENAME.key(), filename);
}});
runner.run(); runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100); runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, true); checkDataFileSplitSize(flowFiles, 1, true);
final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier");
IntStream.range(0, flowFiles.size()).forEach((i) -> {
MockFlowFile flowFile = flowFiles.get(i);
assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index")));
assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier"));
assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count")));
assertEquals(filename, flowFile.getAttribute("segment.original.filename"));
});
} }
@Test @Test
@ -261,7 +276,7 @@ public class TestSplitAvro {
} catch (EOFException eof) { } catch (EOFException eof) {
// expected // expected
} }
Assert.assertEquals(expectedRecordsPerSplit, count); assertEquals(expectedRecordsPerSplit, count);
} }
if (checkMetadata) { if (checkMetadata) {
@ -285,12 +300,12 @@ public class TestSplitAvro {
Assert.assertNotNull(record.get("favorite_number")); Assert.assertNotNull(record.get("favorite_number"));
count++; count++;
} }
Assert.assertEquals(expectedRecordsPerSplit, count); assertEquals(expectedRecordsPerSplit, count);
if (checkMetadata) { if (checkMetadata) {
Assert.assertEquals(META_VALUE1, reader.getMetaString(META_KEY1)); assertEquals(META_VALUE1, reader.getMetaString(META_KEY1));
Assert.assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2)); assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2));
Assert.assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8")); assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8"));
} }
} }
} }
@ -311,7 +326,7 @@ public class TestSplitAvro {
} }
} }
} }
Assert.assertEquals(expectedTotalRecords, count); assertEquals(expectedTotalRecords, count);
} }
} }