NIFI-919 Adding SplitAvro processor that splits binary datafiles into smaller datafiles, or bare records.

- Adding documentation about bare record use, renaming Split Size to Output Size, and adding a test case with 0 records
- Removing validators on properties that have allowable values, using positive integer validator for Output Size, and fixing type on processor description
This commit is contained in:
Bryan Bende 2015-09-03 14:46:44 -04:00
parent 1007999415
commit b36841e213
3 changed files with 689 additions and 1 deletions

View File

@ -0,0 +1,370 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.avro;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.util.ObjectHolder;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@SideEffectFree
@SupportsBatching
@Tags({ "avro", "split" })
@CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " +
"the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.")
public class SplitAvro extends AbstractProcessor {
public static final String RECORD_SPLIT_VALUE = "Record";
public static final AllowableValue RECORD_SPLIT = new AllowableValue(RECORD_SPLIT_VALUE, RECORD_SPLIT_VALUE, "Split at Record boundaries");
public static final PropertyDescriptor SPLIT_STRATEGY = new PropertyDescriptor.Builder()
.name("Split Strategy")
.description("The strategy for splitting the incoming datafile. The Record strategy will read the incoming datafile by de-serializing each record.")
.required(true)
.allowableValues(RECORD_SPLIT)
.defaultValue(RECORD_SPLIT.getValue())
.build();
public static final PropertyDescriptor OUTPUT_SIZE = new PropertyDescriptor.Builder()
.name("Output Size")
.description("The number of Avro records to include per split file. In cases where the incoming file has less records than the Output Size, or " +
"when the total number of records does not divide evenly by the Output Size, it is possible to get a split file with less records.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.required(true)
.defaultValue("1")
.build();
public static final String DATAFILE_OUTPUT_VALUE = "Datafile";
public static final String BARE_RECORD_OUTPUT_VALUE = "Bare Record";
public static final AllowableValue DATAFILE_OUTPUT = new AllowableValue(DATAFILE_OUTPUT_VALUE, DATAFILE_OUTPUT_VALUE, "Avro's object container file format");
public static final AllowableValue BARE_RECORD_OUTPUT = new AllowableValue(BARE_RECORD_OUTPUT_VALUE, BARE_RECORD_OUTPUT_VALUE, "Bare Avro records");
public static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder()
.name("Output Strategy")
.description("Determines the format of the output. Either Avro Datafile, or bare record. Bare record output is only intended for use with systems " +
"that already require it, and shouldn't be needed for normal use.")
.required(true)
.allowableValues(DATAFILE_OUTPUT, BARE_RECORD_OUTPUT)
.defaultValue(DATAFILE_OUTPUT.getValue())
.build();
public static final PropertyDescriptor TRANSFER_METADATA = new PropertyDescriptor.Builder()
.name("Transfer Metadata")
.description("Whether or not to transfer metadata from the parent datafile to the children. If the Output Strategy is Bare Record, " +
"then the metadata will be stored as FlowFile attributes, otherwise it will be in the Datafile header.")
.required(true)
.allowableValues("true", "false")
.defaultValue("true")
.build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder()
.name("original")
.description("The original FlowFile that was split. If the FlowFile fails processing, nothing will be sent to " +
"this relationship")
.build();
public static final Relationship REL_SPLIT = new Relationship.Builder()
.name("split")
.description("All new files split from the original FlowFile will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If a FlowFile fails processing for any reason (for example, the FlowFile is not valid Avro), " +
"it will be routed to this relationship")
.build();
// Metadata keys that are not transferred to split files when output strategy is datafile
// Avro will write this key/values pairs on its own
static final Set<String> RESERVED_METADATA;
static {
Set<String> reservedMetadata = new HashSet<>();
reservedMetadata.add("avro.schema");
reservedMetadata.add("avro.codec");
RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata);
}
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(SPLIT_STRATEGY);
properties.add(OUTPUT_SIZE);
properties.add(OUTPUT_STRATEGY);
properties.add(TRANSFER_METADATA);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_ORIGINAL);
relationships.add(REL_SPLIT);
relationships.add(REL_FAILURE);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final int splitSize = context.getProperty(OUTPUT_SIZE).asInteger();
final boolean transferMetadata = context.getProperty(TRANSFER_METADATA).asBoolean();
SplitWriter splitWriter;
final String outputStrategy = context.getProperty(OUTPUT_STRATEGY).getValue();
switch (outputStrategy) {
case DATAFILE_OUTPUT_VALUE:
splitWriter = new DatafileSplitWriter(transferMetadata);
break;
case BARE_RECORD_OUTPUT_VALUE:
splitWriter = new BareRecordSplitWriter();
break;
default:
throw new AssertionError();
}
Splitter splitter;
final String splitStrategy = context.getProperty(SPLIT_STRATEGY).getValue();
switch (splitStrategy) {
case RECORD_SPLIT_VALUE:
splitter = new RecordSplitter(splitSize, transferMetadata);
break;
default:
throw new AssertionError();
}
try {
final List<FlowFile> splits = splitter.split(session, flowFile, splitWriter);
session.transfer(splits, REL_SPLIT);
session.transfer(flowFile, REL_ORIGINAL);
} catch (ProcessException e) {
getLogger().error("Failed to split {} due to {}", new Object[] {flowFile, e.getMessage()}, e);
session.transfer(flowFile, REL_FAILURE);
}
}
/**
* Able to split an incoming Avro datafile into multiple smaller FlowFiles.
*/
private interface Splitter {
List<FlowFile> split(final ProcessSession session, final FlowFile originalFlowFile, final SplitWriter splitWriter);
}
/**
* Splits the incoming Avro datafile into batches of records by reading and de-serializing each record.
*/
private class RecordSplitter implements Splitter {
private final int splitSize;
private final boolean transferMetadata;
public RecordSplitter(final int splitSize, final boolean transferMetadata) {
this.splitSize = splitSize;
this.transferMetadata = transferMetadata;
}
@Override
public List<FlowFile> split(final ProcessSession session, final FlowFile originalFlowFile, final SplitWriter splitWriter) {
final List<FlowFile> childFlowFiles = new ArrayList<>();
final ObjectHolder<GenericRecord> recordHolder = new ObjectHolder<>(null);
session.read(originalFlowFile, new InputStreamCallback() {
@Override
public void process(InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final ObjectHolder<String> codec = new ObjectHolder<>(reader.getMetaString(DataFileConstants.CODEC));
if (codec.get() == null) {
codec.set(DataFileConstants.NULL_CODEC);
}
// while records are left, start a new split by spawning a FlowFile
while (reader.hasNext()) {
FlowFile childFlowFile = session.create(originalFlowFile);
childFlowFile = session.write(childFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream rawOut) throws IOException {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut)) {
splitWriter.init(reader, codec.get(), out);
// append to the current FlowFile until no more records, or splitSize is reached
int recordCount = 0;
while (reader.hasNext() && recordCount < splitSize) {
recordHolder.set(reader.next(recordHolder.get()));
splitWriter.write(recordHolder.get());
recordCount++;
}
splitWriter.flush();
} finally {
splitWriter.close();
}
}
});
// would prefer this to be part of the SplitWriter, but putting the metadata in FlowFile attributes
// can't be done inside of an OutputStream callback which is where the splitWriter is used
if (splitWriter instanceof BareRecordSplitWriter && transferMetadata) {
final Map<String,String> metadata = new HashMap<>();
for (String metaKey : reader.getMetaKeys()) {
metadata.put(metaKey, reader.getMetaString(metaKey));
}
childFlowFile = session.putAllAttributes(childFlowFile, metadata);
}
childFlowFiles.add(childFlowFile);
}
}
}
});
return childFlowFiles;
}
}
/**
* Writes records from the reader to the given output stream.
*/
private interface SplitWriter {
void init(final DataFileStream<GenericRecord> reader, final String codec, final OutputStream out) throws IOException;
void write(final GenericRecord datum) throws IOException;
void flush() throws IOException;
void close() throws IOException;
}
/**
* Writes a binary Avro Datafile to the OutputStream.
*/
private class DatafileSplitWriter implements SplitWriter {
private final boolean transferMetadata;
private DataFileWriter<GenericRecord> writer;
public DatafileSplitWriter(final boolean transferMetadata) {
this.transferMetadata = transferMetadata;
}
@Override
public void init(final DataFileStream<GenericRecord> reader, final String codec, final OutputStream out) throws IOException {
writer = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>());
if (transferMetadata) {
for (String metaKey : reader.getMetaKeys()) {
if (!RESERVED_METADATA.contains(metaKey)) {
writer.setMeta(metaKey, reader.getMeta(metaKey));
}
}
}
writer.setCodec(CodecFactory.fromString(codec));
writer.create(reader.getSchema(), out);
}
@Override
public void write(final GenericRecord datum) throws IOException {
writer.append(datum);
}
@Override
public void flush() throws IOException {
writer.flush();
}
@Override
public void close() throws IOException {
writer.close();
}
}
/**
* Writes bare Avro records to the OutputStream.
*/
private class BareRecordSplitWriter implements SplitWriter {
private Encoder encoder;
private DatumWriter<GenericRecord> writer;
@Override
public void init(final DataFileStream<GenericRecord> reader, final String codec, final OutputStream out) throws IOException {
writer = new GenericDatumWriter<>(reader.getSchema());
encoder = EncoderFactory.get().binaryEncoder(out, null);
}
@Override
public void write(GenericRecord datum) throws IOException {
writer.write(datum, encoder);
}
@Override
public void flush() throws IOException {
encoder.flush();
}
@Override
public void close() throws IOException {
// nothing to do
}
}
}

View File

@ -14,3 +14,4 @@
# limitations under the License. # limitations under the License.
org.apache.nifi.processors.avro.ConvertAvroToJSON org.apache.nifi.processors.avro.ConvertAvroToJSON
org.apache.nifi.processors.avro.ExtractAvroMetadata org.apache.nifi.processors.avro.ExtractAvroMetadata
org.apache.nifi.processors.avro.SplitAvro

View File

@ -0,0 +1,317 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.avro;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestSplitAvro {
static final String META_KEY1 = "metaKey1";
static final String META_KEY2 = "metaKey2";
static final String META_KEY3 = "metaKey3";
static final String META_VALUE1 = "metaValue1";
static final long META_VALUE2 = Long.valueOf(1234567);
static final String META_VALUE3 = "metaValue3";
private Schema schema;
private ByteArrayOutputStream users;
@Before
public void setup() throws IOException {
this.users = new ByteArrayOutputStream();
this.schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
createUsers(100, users);
}
void createUsers(final int numUsers, final ByteArrayOutputStream users) throws IOException {
final List<GenericRecord> userList = new ArrayList<>();
for (int i=0; i < numUsers; i++) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "name" + i);
user.put("favorite_number", i);
userList.add(user);
}
try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(new GenericDatumWriter<GenericRecord>(schema))) {
dataFileWriter.setMeta(META_KEY1, META_VALUE1);
dataFileWriter.setMeta(META_KEY2, META_VALUE2);
dataFileWriter.setMeta(META_KEY3, META_VALUE3.getBytes("UTF-8"));
dataFileWriter.create(schema, users);
for (GenericRecord user : userList) {
dataFileWriter.append(user);
}
dataFileWriter.flush();
}
}
@Test
public void testRecordSplitWithNoIncomingRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
final ByteArrayOutputStream out = new ByteArrayOutputStream();
createUsers(0, out);
runner.enqueue(out.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 0);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
}
@Test
public void testRecordSplitDatafileOutputWithSingleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, true);
}
@Test
public void testRecordSplitDatafileOutputWithMultipleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 20, true);
}
@Test
public void testRecordSplitDatafileOutputWithSplitSizeLarger() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_SIZE, "200");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 1);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 100, true);
}
@Test
public void testRecordSplitDatafileOutputWithoutMetadata() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.TRANSFER_METADATA, "false");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkDataFileSplitSize(flowFiles, 1, false);
for (final MockFlowFile flowFile : flowFiles) {
try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray());
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
Assert.assertFalse(reader.getMetaKeys().contains(META_KEY1));
Assert.assertFalse(reader.getMetaKeys().contains(META_KEY2));
Assert.assertFalse(reader.getMetaKeys().contains(META_KEY3));
}
}
}
@Test
public void testRecordSplitBareOutputWithSingleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_STRATEGY, SplitAvro.BARE_RECORD_OUTPUT);
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 1, true);
}
@Test
public void testRecordSplitBareOutputWithMultipleRecords() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_STRATEGY, SplitAvro.BARE_RECORD_OUTPUT);
runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, true);
}
@Test
public void testRecordSplitBareOutputWithoutMetadata() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_STRATEGY, SplitAvro.BARE_RECORD_OUTPUT);
runner.setProperty(SplitAvro.OUTPUT_SIZE, "20");
runner.setProperty(SplitAvro.TRANSFER_METADATA, "false");
runner.enqueue(users.toByteArray());
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 5);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
checkBareRecordsSplitSize(flowFiles, 20, false);
for (final MockFlowFile flowFile : flowFiles) {
Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY1));
Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY2));
Assert.assertFalse(flowFile.getAttributes().containsKey(META_KEY3));
}
}
@Test
public void testFailure() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
runner.setProperty(SplitAvro.OUTPUT_SIZE, "200");
runner.enqueue("not avro".getBytes("UTF-8"));
runner.run();
runner.assertTransferCount(SplitAvro.REL_SPLIT, 0);
runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 0);
runner.assertTransferCount(SplitAvro.REL_FAILURE, 1);
}
private void checkBareRecordsSplitSize(final List<MockFlowFile> flowFiles, final int expectedRecordsPerSplit, final boolean checkMetadata) throws IOException {
for (final MockFlowFile flowFile : flowFiles) {
try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray())) {
final DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
final Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);
int count = 0;
GenericRecord record = reader.read(null, decoder);
try {
while (record != null) {
Assert.assertNotNull(record.get("name"));
Assert.assertNotNull(record.get("favorite_number"));
count++;
record = reader.read(record, decoder);
}
} catch (EOFException eof) {
// expected
}
Assert.assertEquals(expectedRecordsPerSplit, count);
}
if (checkMetadata) {
Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY1));
Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY2));
Assert.assertTrue(flowFile.getAttributes().containsKey(META_KEY3));
}
}
}
private void checkDataFileSplitSize(List<MockFlowFile> flowFiles, int expectedRecordsPerSplit, boolean checkMetadata) throws IOException {
for (final MockFlowFile flowFile : flowFiles) {
try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray());
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
int count = 0;
GenericRecord record = null;
while (reader.hasNext()) {
record = reader.next(record);
Assert.assertNotNull(record.get("name"));
Assert.assertNotNull(record.get("favorite_number"));
count++;
}
Assert.assertEquals(expectedRecordsPerSplit, count);
if (checkMetadata) {
Assert.assertEquals(META_VALUE1, reader.getMetaString(META_KEY1));
Assert.assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2));
Assert.assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8"));
}
}
}
}
private void checkDataFileTotalSize(List<MockFlowFile> flowFiles, int expectedTotalRecords) throws IOException {
int count = 0;
for (final MockFlowFile flowFile : flowFiles) {
try (final ByteArrayInputStream in = new ByteArrayInputStream(flowFile.toByteArray());
final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
GenericRecord record = null;
while (reader.hasNext()) {
record = reader.next(record);
Assert.assertNotNull(record.get("name"));
Assert.assertNotNull(record.get("favorite_number"));
count++;
}
}
}
Assert.assertEquals(expectedTotalRecords, count);
}
}