From b36841e2134fc2d3aea0cafdf84ea9b0bafd3e50 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Thu, 3 Sep 2015 14:46:44 -0400 Subject: [PATCH] NIFI-919 Adding SplitAvro processor that splits binary datafiles into smaller datafiles, or bare records. - Adding documentation about bare record use, renaming Split Size to Output Size, and adding a test case with 0 records - Removing validators on properties that have allowable values, using positive integer validator for Output Size, and fixing type on processor description --- .../nifi/processors/avro/SplitAvro.java | 370 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../nifi/processors/avro/TestSplitAvro.java | 317 +++++++++++++++ 3 files changed, 689 insertions(+), 1 deletion(-) create mode 100644 nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java create mode 100644 nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java new file mode 100644 index 0000000000..3b344b5d4c --- /dev/null +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileConstants; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.util.ObjectHolder; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +@SideEffectFree +@SupportsBatching +@Tags({ "avro", "split" }) +@CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " + + "the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.") +public class SplitAvro extends AbstractProcessor { + + public static final String RECORD_SPLIT_VALUE = "Record"; + public static final AllowableValue RECORD_SPLIT = new AllowableValue(RECORD_SPLIT_VALUE, RECORD_SPLIT_VALUE, "Split at Record boundaries"); + + public static final PropertyDescriptor SPLIT_STRATEGY = new PropertyDescriptor.Builder() + .name("Split Strategy") + .description("The strategy for splitting the incoming datafile. The Record strategy will read the incoming datafile by de-serializing each record.") + .required(true) + .allowableValues(RECORD_SPLIT) + .defaultValue(RECORD_SPLIT.getValue()) + .build(); + + public static final PropertyDescriptor OUTPUT_SIZE = new PropertyDescriptor.Builder() + .name("Output Size") + .description("The number of Avro records to include per split file. In cases where the incoming file has less records than the Output Size, or " + + "when the total number of records does not divide evenly by the Output Size, it is possible to get a split file with less records.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .required(true) + .defaultValue("1") + .build(); + + public static final String DATAFILE_OUTPUT_VALUE = "Datafile"; + public static final String BARE_RECORD_OUTPUT_VALUE = "Bare Record"; + + public static final AllowableValue DATAFILE_OUTPUT = new AllowableValue(DATAFILE_OUTPUT_VALUE, DATAFILE_OUTPUT_VALUE, "Avro's object container file format"); + public static final AllowableValue BARE_RECORD_OUTPUT = new AllowableValue(BARE_RECORD_OUTPUT_VALUE, BARE_RECORD_OUTPUT_VALUE, "Bare Avro records"); + + public static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder() + .name("Output Strategy") + .description("Determines the format of the output. Either Avro Datafile, or bare record. Bare record output is only intended for use with systems " + + "that already require it, and shouldn't be needed for normal use.") + .required(true) + .allowableValues(DATAFILE_OUTPUT, BARE_RECORD_OUTPUT) + .defaultValue(DATAFILE_OUTPUT.getValue()) + .build(); + + public static final PropertyDescriptor TRANSFER_METADATA = new PropertyDescriptor.Builder() + .name("Transfer Metadata") + .description("Whether or not to transfer metadata from the parent datafile to the children. If the Output Strategy is Bare Record, " + + "then the metadata will be stored as FlowFile attributes, otherwise it will be in the Datafile header.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile that was split. If the FlowFile fails processing, nothing will be sent to " + + "this relationship") + .build(); + public static final Relationship REL_SPLIT = new Relationship.Builder() + .name("split") + .description("All new files split from the original FlowFile will be routed to this relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid Avro), " + + "it will be routed to this relationship") + .build(); + + // Metadata keys that are not transferred to split files when output strategy is datafile + // Avro will write this key/values pairs on its own + static final Set RESERVED_METADATA; + static { + Set reservedMetadata = new HashSet<>(); + reservedMetadata.add("avro.schema"); + reservedMetadata.add("avro.codec"); + RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata); + } + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(SPLIT_STRATEGY); + properties.add(OUTPUT_SIZE); + properties.add(OUTPUT_STRATEGY); + properties.add(TRANSFER_METADATA); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_ORIGINAL); + relationships.add(REL_SPLIT); + relationships.add(REL_FAILURE); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + public Set getRelationships() { + return relationships; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final int splitSize = context.getProperty(OUTPUT_SIZE).asInteger(); + final boolean transferMetadata = context.getProperty(TRANSFER_METADATA).asBoolean(); + + SplitWriter splitWriter; + final String outputStrategy = context.getProperty(OUTPUT_STRATEGY).getValue(); + switch (outputStrategy) { + case DATAFILE_OUTPUT_VALUE: + splitWriter = new DatafileSplitWriter(transferMetadata); + break; + case BARE_RECORD_OUTPUT_VALUE: + splitWriter = new BareRecordSplitWriter(); + break; + default: + throw new AssertionError(); + } + + Splitter splitter; + final String splitStrategy = context.getProperty(SPLIT_STRATEGY).getValue(); + switch (splitStrategy) { + case RECORD_SPLIT_VALUE: + splitter = new RecordSplitter(splitSize, transferMetadata); + break; + default: + throw new AssertionError(); + } + + try { + final List splits = splitter.split(session, flowFile, splitWriter); + session.transfer(splits, REL_SPLIT); + session.transfer(flowFile, REL_ORIGINAL); + } catch (ProcessException e) { + getLogger().error("Failed to split {} due to {}", new Object[] {flowFile, e.getMessage()}, e); + session.transfer(flowFile, REL_FAILURE); + } + } + + /** + * Able to split an incoming Avro datafile into multiple smaller FlowFiles. + */ + private interface Splitter { + List split(final ProcessSession session, final FlowFile originalFlowFile, final SplitWriter splitWriter); + } + + /** + * Splits the incoming Avro datafile into batches of records by reading and de-serializing each record. + */ + private class RecordSplitter implements Splitter { + + private final int splitSize; + private final boolean transferMetadata; + + public RecordSplitter(final int splitSize, final boolean transferMetadata) { + this.splitSize = splitSize; + this.transferMetadata = transferMetadata; + } + + @Override + public List split(final ProcessSession session, final FlowFile originalFlowFile, final SplitWriter splitWriter) { + final List childFlowFiles = new ArrayList<>(); + final ObjectHolder recordHolder = new ObjectHolder<>(null); + + session.read(originalFlowFile, new InputStreamCallback() { + @Override + public void process(InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + final ObjectHolder codec = new ObjectHolder<>(reader.getMetaString(DataFileConstants.CODEC)); + if (codec.get() == null) { + codec.set(DataFileConstants.NULL_CODEC); + } + + // while records are left, start a new split by spawning a FlowFile + while (reader.hasNext()) { + FlowFile childFlowFile = session.create(originalFlowFile); + childFlowFile = session.write(childFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream rawOut) throws IOException { + try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) { + splitWriter.init(reader, codec.get(), out); + + // append to the current FlowFile until no more records, or splitSize is reached + int recordCount = 0; + while (reader.hasNext() && recordCount < splitSize) { + recordHolder.set(reader.next(recordHolder.get())); + splitWriter.write(recordHolder.get()); + recordCount++; + } + splitWriter.flush(); + } finally { + splitWriter.close(); + } + } + }); + + // would prefer this to be part of the SplitWriter, but putting the metadata in FlowFile attributes + // can't be done inside of an OutputStream callback which is where the splitWriter is used + if (splitWriter instanceof BareRecordSplitWriter && transferMetadata) { + final Map metadata = new HashMap<>(); + for (String metaKey : reader.getMetaKeys()) { + metadata.put(metaKey, reader.getMetaString(metaKey)); + } + childFlowFile = session.putAllAttributes(childFlowFile, metadata); + } + + childFlowFiles.add(childFlowFile); + } + } + } + }); + + return childFlowFiles; + } + } + + /** + * Writes records from the reader to the given output stream. + */ + private interface SplitWriter { + void init(final DataFileStream reader, final String codec, final OutputStream out) throws IOException; + void write(final GenericRecord datum) throws IOException; + void flush() throws IOException; + void close() throws IOException; + } + + /** + * Writes a binary Avro Datafile to the OutputStream. + */ + private class DatafileSplitWriter implements SplitWriter { + + private final boolean transferMetadata; + private DataFileWriter writer; + + public DatafileSplitWriter(final boolean transferMetadata) { + this.transferMetadata = transferMetadata; + } + + @Override + public void init(final DataFileStream reader, final String codec, final OutputStream out) throws IOException { + writer = new DataFileWriter<>(new GenericDatumWriter()); + + if (transferMetadata) { + for (String metaKey : reader.getMetaKeys()) { + if (!RESERVED_METADATA.contains(metaKey)) { + writer.setMeta(metaKey, reader.getMeta(metaKey)); + } + } + } + + writer.setCodec(CodecFactory.fromString(codec)); + writer.create(reader.getSchema(), out); + } + + @Override + public void write(final GenericRecord datum) throws IOException { + writer.append(datum); + } + + @Override + public void flush() throws IOException { + writer.flush(); + } + + @Override + public void close() throws IOException { + writer.close(); + } + } + + /** + * Writes bare Avro records to the OutputStream. + */ + private class BareRecordSplitWriter implements SplitWriter { + private Encoder encoder; + private DatumWriter writer; + + @Override + public void init(final DataFileStream reader, final String codec, final OutputStream out) throws IOException { + writer = new GenericDatumWriter<>(reader.getSchema()); + encoder = EncoderFactory.get().binaryEncoder(out, null); + } + + @Override + public void write(GenericRecord datum) throws IOException { + writer.write(datum, encoder); + } + + @Override + public void flush() throws IOException { + encoder.flush(); + } + + @Override + public void close() throws IOException { + // nothing to do + } + } + +} diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 192ec00f39..7ab13fa694 100644 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.avro.ConvertAvroToJSON -org.apache.nifi.processors.avro.ExtractAvroMetadata \ No newline at end of file +org.apache.nifi.processors.avro.ExtractAvroMetadata +org.apache.nifi.processors.avro.SplitAvro diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java new file mode 100644 index 0000000000..73da818523 --- /dev/null +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.nifi.stream.io.ByteArrayInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestSplitAvro { + + static final String META_KEY1 = "metaKey1"; + static final String META_KEY2 = "metaKey2"; + static final String META_KEY3 = "metaKey3"; + + static final String META_VALUE1 = "metaValue1"; + static final long META_VALUE2 = Long.valueOf(1234567); + static final String META_VALUE3 = "metaValue3"; + + private Schema schema; + private ByteArrayOutputStream users; + + @Before + public void setup() throws IOException { + this.users = new ByteArrayOutputStream(); + this.schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc")); + createUsers(100, users); + } + + void createUsers(final int numUsers, final ByteArrayOutputStream users) throws IOException { + final List userList = new ArrayList<>(); + for (int i=0; i < numUsers; i++) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "name" + i); + user.put("favorite_number", i); + userList.add(user); + } + + try (final DataFileWriter dataFileWriter = new DataFileWriter<>(new GenericDatumWriter(schema))) { + dataFileWriter.setMeta(META_KEY1, META_VALUE1); + dataFileWriter.setMeta(META_KEY2, META_VALUE2); + dataFileWriter.setMeta(META_KEY3, META_VALUE3.getBytes("UTF-8")); + + dataFileWriter.create(schema, users); + for (GenericRecord user : userList) { + dataFileWriter.append(user); + } + dataFileWriter.flush(); + } + } + + @Test + public void testRecordSplitWithNoIncomingRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + createUsers(0, out); + + runner.enqueue(out.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 0); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + } + + @Test + public void testRecordSplitDatafileOutputWithSingleRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 100); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + checkDataFileSplitSize(flowFiles, 1, true); + } + + @Test + public void testRecordSplitDatafileOutputWithMultipleRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.OUTPUT_SIZE, "20"); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 5); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + checkDataFileSplitSize(flowFiles, 20, true); + } + + @Test + public void testRecordSplitDatafileOutputWithSplitSizeLarger() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.OUTPUT_SIZE, "200"); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 1); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + checkDataFileSplitSize(flowFiles, 100, true); + } + + @Test + public void testRecordSplitDatafileOutputWithoutMetadata() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.TRANSFER_METADATA, "false"); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 100); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + checkDataFileSplitSize(flowFiles, 1, false); + + for (final MockFlowFile flowFile : flowFiles) { + try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + Assert.assertFalse(reader.getMetaKeys().contains(META_KEY1)); + Assert.assertFalse(reader.getMetaKeys().contains(META_KEY2)); + Assert.assertFalse(reader.getMetaKeys().contains(META_KEY3)); + } + } + } + + @Test + public void testRecordSplitBareOutputWithSingleRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.OUTPUT_STRATEGY, SplitAvro.BARE_RECORD_OUTPUT); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 100); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + + checkBareRecordsSplitSize(flowFiles, 1, true); + } + + @Test + public void testRecordSplitBareOutputWithMultipleRecords() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.OUTPUT_STRATEGY, SplitAvro.BARE_RECORD_OUTPUT); + runner.setProperty(SplitAvro.OUTPUT_SIZE, "20"); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 5); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + + checkBareRecordsSplitSize(flowFiles, 20, true); + } + + @Test + public void testRecordSplitBareOutputWithoutMetadata() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.OUTPUT_STRATEGY, SplitAvro.BARE_RECORD_OUTPUT); + runner.setProperty(SplitAvro.OUTPUT_SIZE, "20"); + runner.setProperty(SplitAvro.TRANSFER_METADATA, "false"); + + runner.enqueue(users.toByteArray()); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 5); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 0); + + final List flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT); + + checkBareRecordsSplitSize(flowFiles, 20, false); + + for (final MockFlowFile flowFile : flowFiles) { + Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY1)); + Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY2)); + Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY3)); + } + } + + @Test + public void testFailure() throws IOException { + final TestRunner runner = TestRunners.newTestRunner(new SplitAvro()); + runner.setProperty(SplitAvro.OUTPUT_SIZE, "200"); + + runner.enqueue("not avro".getBytes("UTF-8")); + runner.run(); + + runner.assertTransferCount(SplitAvro.REL_SPLIT, 0); + runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 0); + runner.assertTransferCount(SplitAvro.REL_FAILURE, 1); + } + + private void checkBareRecordsSplitSize(final List flowFiles, final int expectedRecordsPerSplit, final boolean checkMetadata) throws IOException { + for (final MockFlowFile flowFile : flowFiles) { + try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray())) { + final DatumReader reader = new GenericDatumReader<>(schema); + final Decoder decoder = DecoderFactory.get().binaryDecoder(in, null); + + int count = 0; + GenericRecord record = reader.read(null, decoder); + try { + while (record != null) { + Assert.assertNotNull(record.get("name")); + Assert.assertNotNull(record.get("favorite_number")); + count++; + record = reader.read(record, decoder); + } + } catch (EOFException eof) { + // expected + } + Assert.assertEquals(expectedRecordsPerSplit, count); + } + + if (checkMetadata) { + Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY1)); + Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY2)); + Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY3)); + } + } + } + + private void checkDataFileSplitSize(List flowFiles, int expectedRecordsPerSplit, boolean checkMetadata) throws IOException { + for (final MockFlowFile flowFile : flowFiles) { + try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + int count = 0; + GenericRecord record = null; + while (reader.hasNext()) { + record = reader.next(record); + Assert.assertNotNull(record.get("name")); + Assert.assertNotNull(record.get("favorite_number")); + count++; + } + Assert.assertEquals(expectedRecordsPerSplit, count); + + if (checkMetadata) { + Assert.assertEquals(META_VALUE1, reader.getMetaString(META_KEY1)); + Assert.assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2)); + Assert.assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8")); + } + } + } + } + + private void checkDataFileTotalSize(List flowFiles, int expectedTotalRecords) throws IOException { + int count = 0; + for (final MockFlowFile flowFile : flowFiles) { + try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray()); + final DataFileStream reader = new DataFileStream<>(in, new GenericDatumReader())) { + + GenericRecord record = null; + while (reader.hasNext()) { + record = reader.next(record); + Assert.assertNotNull(record.get("name")); + Assert.assertNotNull(record.get("favorite_number")); + count++; + } + } + } + Assert.assertEquals(expectedTotalRecords, count); + } + +}