NIFI-1156

This commit is contained in:
Jeremy Dyer 2016-01-05 20:53:48 -05:00
parent 8d37af07b9
commit 8966643d48
5 changed files with 744 additions and 0 deletions

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.kitesdk.data.spi.filesystem.CSVProperties;
import org.kitesdk.data.spi.filesystem.CSVUtil;
import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
@Tags({"kite", "csv", "avro", "infer", "schema"})
@SeeAlso({InferAvroSchemaFromCSV.class})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Creates an Avro schema from a CSV file header. The header line definition can either be provided" +
"as a property to the processor OR present in the first line of CSV in the incoming FlowFile content. If a header" +
" property is specified for this processor no attempt will be made to use the header line that may be present" +
" in the incoming CSV FlowFile content.")
public class InferAvroSchemaFromCSV
extends AbstractKiteProcessor {
public static final String CSV_DELIMITER = ",";
public static final PropertyDescriptor HEADER_LINE = new PropertyDescriptor.Builder()
.name("CSV Header Line")
.description("Comma separated string defining the column names expected in the CSV data. " +
"EX: \"fname,lname,zip,address\"")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder()
.name("CSV Header Line Skip Count")
.description("Specifies the number of header lines that should be skipped when reading the CSV data. If the " +
" first line of the CSV data is a header line and you specify the \"CSV Header Line\" property " +
"you need to set this vlaue to 1 otherwise the header line will be treated as actual data.")
.required(true)
.defaultValue("0")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder()
.name("CSV escape string")
.description("String that represents an escape sequence in the CSV FlowFile content data.")
.required(true)
.defaultValue("\\")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder()
.name("CSV quote string")
.description("String that represents a literal quote character in the CSV FlowFile content data.")
.required(true)
.defaultValue("'")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
.name("Avro Record Name")
.description("Value to be placed in the Avro record schema \"name\" field.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Charset")
.description("Character encoding of CSV data.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
.name("Pretty Avro Output")
.description("If true the Avro output will be formatted.")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully created Avro schema for CSV data.").build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
.description("Original incoming FlowFile CSV data").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to create Avro schema for CSV data.").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(HEADER_LINE);
properties.add(HEADER_LINE_SKIP_COUNT);
properties.add(ESCAPE_STRING);
properties.add(QUOTE_STRING);
properties.add(PRETTY_AVRO_OUTPUT);
properties.add(RECORD_NAME);
properties.add(CHARSET);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
try {
//Determines the header line either from the property input or the first line of the delimited file.
final AtomicReference<String> header = new AtomicReference<>();
final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
if (context.getProperty(HEADER_LINE).isSet()) {
header.set(context.getProperty(HEADER_LINE).getValue());
hasHeader.set(Boolean.FALSE);
} else {
//Read the first line of the file to get the header value.
session.read(original, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
BufferedReader br = new BufferedReader(new InputStreamReader(in));
header.set(br.readLine());
hasHeader.set(Boolean.TRUE);
br.close();
}
});
}
//Prepares the CSVProperties for kite
final CSVProperties props = new CSVProperties.Builder()
.delimiter(CSV_DELIMITER)
.escape(context.getProperty(ESCAPE_STRING).getValue())
.quote(context.getProperty(QUOTE_STRING).getValue())
.header(header.get())
.hasHeader(hasHeader.get())
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).asInteger())
.charset(context.getProperty(CHARSET).getValue())
.build();
final Set<String> required = ImmutableSet.of();
final AtomicReference<String> avroSchema = new AtomicReference<>();
session.read(original, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
avroSchema.set(CSVUtil
.inferNullableSchema(
context.getProperty(RECORD_NAME).getValue(), in, props, required)
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
}
});
FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(avroSchema.get().getBytes());
}
});
//Transfer the sessions.
session.transfer(original, REL_ORIGINAL);
session.transfer(avroSchemaFF, REL_SUCCESS);
} catch (Exception ex) {
getLogger().error(ex.getMessage());
session.transfer(original, REL_FAILURE);
}
}
}

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.kitesdk.data.spi.JsonUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.HashSet;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
@Tags({"kite", "json", "avro", "infer", "schema"})
@SeeAlso({InferAvroSchemaFromJSON.class})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Creates an Avro schema from JSON data. The Avro schema is inferred by examining the fields " +
"in the JSON input. The Avro schema generated by kite will use the same names present in the incoming JSON payload")
public class InferAvroSchemaFromJSON
extends AbstractKiteProcessor {
public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
.name("Avro Record Name")
.description("Value to be placed in the Avro record schema \"name\" field.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder()
.name("Number of records to analyze")
.description("Number of records that should be analyzed by kite to infer the Avro schema")
.required(true)
.defaultValue("10")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.build();
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
.name("Charset")
.description("Character encoding of CSV data.")
.required(true)
.defaultValue("UTF-8")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
.name("Pretty Avro Output")
.description("If true the Avro output will be formatted.")
.required(true)
.defaultValue("true")
.allowableValues("true", "false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("Successfully created Avro schema for JSON data.").build();
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
.description("Original incoming FlowFile JSON data").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("Failed to create Avro schema for JSON data.").build();
private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CHARSET);
properties.add(PRETTY_AVRO_OUTPUT);
properties.add(RECORD_NAME);
properties.add(NUM_RECORDS_TO_ANALYZE);
this.properties = Collections.unmodifiableList(properties);
final Set<Relationship> relationships = new HashSet<>();
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_ORIGINAL);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return properties;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}
try {
final AtomicReference<String> avroSchema = new AtomicReference<>();
session.read(original, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
Schema as = JsonUtil.inferSchema(
in, context.getProperty(RECORD_NAME).getValue(), context.getProperty(NUM_RECORDS_TO_ANALYZE).asInteger());
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
}
});
FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(avroSchema.get().getBytes());
}
});
//Transfer the FlowFiles
session.transfer(original, REL_ORIGINAL);
session.transfer(avroSchemaFF, REL_SUCCESS);
} catch (Exception ex) {
getLogger().error(ex.getMessage());
session.transfer(original, REL_FAILURE);
}
}
}

View File

@ -16,3 +16,5 @@ org.apache.nifi.processors.kite.StoreInKiteDataset
org.apache.nifi.processors.kite.ConvertCSVToAvro org.apache.nifi.processors.kite.ConvertCSVToAvro
org.apache.nifi.processors.kite.ConvertJSONToAvro org.apache.nifi.processors.kite.ConvertJSONToAvro
org.apache.nifi.processors.kite.ConvertAvroSchema org.apache.nifi.processors.kite.ConvertAvroSchema
org.apache.nifi.processors.kite.InferAvroSchemaFromCSV
org.apache.nifi.processors.kite.InferAvroSchemaFromJSON

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public class TestInferAvroSchemaFromCSV {
private final String CSV_HEADER_LINE = "fname,lname,age,zip";
@Test
public void inferSchemaFromHeaderLineOfCSV() throws Exception {
TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
runner.assertNotValid();
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "0");
runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
runner.assertValid();
ProcessSession session = runner.getProcessSessionFactory().createSession();
FlowFile ff = session.write(session.create(), new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes());
}
});
//Enqueue the empty FlowFile
runner.enqueue(ff);
runner.run();
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
}
@Test
public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception {
TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
runner.assertNotValid();
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE);
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
runner.assertValid();
ProcessSession session = runner.getProcessSessionFactory().createSession();
FlowFile ff = session.write(session.create(), new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes());
}
});
//Enqueue the empty FlowFile
runner.enqueue(ff);
runner.run();
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
}
@Test
public void inferSchemaFromEmptyContent() throws Exception {
TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
runner.assertNotValid();
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE);
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
runner.assertValid();
ProcessSession session = runner.getProcessSessionFactory().createSession();
FlowFile ff = session.write(session.create(), new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
out.write("".getBytes());
}
});
//Enqueue the empty FlowFile
runner.enqueue(ff);
runner.run();
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 1);
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 0);
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 0);
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.List;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
public class TestInferAvroSchemaFromJSON {
public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
.fields().requiredString("id").requiredString("primaryColor")
.optionalString("secondaryColor").optionalString("price")
.endRecord();
public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
.fields().requiredLong("id").requiredString("color")
.optionalDouble("price").endRecord();
public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
public static final String FAILURE_SUMMARY = "Cannot convert free to double";
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
runner.assertNotValid();
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
INPUT_SCHEMA.toString());
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
OUTPUT_SCHEMA.toString());
runner.setProperty("primaryColor", "color");
runner.assertValid();
// Two valid rows, and one invalid because "free" is not a double.
GenericData.Record goodRecord1 = dataBasic("1", "blue", null, null);
GenericData.Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
GenericData.Record badRecord = dataBasic("3", "red", "yellow", "free");
List<GenericData.Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
badRecord);
runner.enqueue(streamFor(input));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 rows", 1, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 1);
MockFlowFile incompatible = runner.getFlowFilesForRelationship(
"failure").get(0);
GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>(
INPUT_SCHEMA);
DataFileStream<GenericData.Record> stream = new DataFileStream<GenericData.Record>(
new ByteArrayInputStream(
runner.getContentAsByteArray(incompatible)), reader);
int count = 0;
for (GenericData.Record r : stream) {
Assert.assertEquals(badRecord, r);
count++;
}
stream.close();
Assert.assertEquals(1, count);
Assert.assertEquals("Should accumulate error messages",
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
GenericDatumReader<GenericData.Record> successReader = new GenericDatumReader<GenericData.Record>(
OUTPUT_SCHEMA);
DataFileStream<GenericData.Record> successStream = new DataFileStream<GenericData.Record>(
new ByteArrayInputStream(runner.getContentAsByteArray(runner
.getFlowFilesForRelationship("success").get(0))),
successReader);
count = 0;
for (GenericData.Record r : successStream) {
if (count == 0) {
Assert.assertEquals(convertBasic(goodRecord1), r);
} else {
Assert.assertEquals(convertBasic(goodRecord2), r);
}
count++;
}
successStream.close();
Assert.assertEquals(2, count);
}
@Test
public void testNestedConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
runner.assertNotValid();
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
runner.setProperty("parent.id", "parentId");
runner.assertValid();
// Two valid rows
GenericData.Record goodRecord1 = dataNested(1L, "200", null, null);
GenericData.Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
List<GenericData.Record> input = Lists.newArrayList(goodRecord1, goodRecord2);
runner.enqueue(streamFor(input));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 0 rows", 0, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 0);
GenericDatumReader<GenericData.Record> successReader = new GenericDatumReader<GenericData.Record>(
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
DataFileStream<GenericData.Record> successStream = new DataFileStream<GenericData.Record>(
new ByteArrayInputStream(runner.getContentAsByteArray(runner
.getFlowFilesForRelationship("success").get(0))),
successReader);
int count = 0;
for (GenericData.Record r : successStream) {
if (count == 0) {
Assert.assertEquals(convertNested(goodRecord1), r);
} else {
Assert.assertEquals(convertNested(goodRecord2), r);
}
count++;
}
successStream.close();
Assert.assertEquals(2, count);
}
private GenericData.Record convertBasic(GenericData.Record inputRecord) {
GenericData.Record result = new GenericData.Record(OUTPUT_SCHEMA);
result.put("id", Long.parseLong(inputRecord.get("id").toString()));
result.put("color", inputRecord.get("primaryColor").toString());
if (inputRecord.get("price") == null) {
result.put("price", null);
} else {
result.put("price",
Double.parseDouble(inputRecord.get("price").toString()));
}
return result;
}
private GenericData.Record dataBasic(String id, String primaryColor,
String secondaryColor, String price) {
GenericData.Record result = new GenericData.Record(INPUT_SCHEMA);
result.put("id", id);
result.put("primaryColor", primaryColor);
result.put("secondaryColor", secondaryColor);
result.put("price", price);
return result;
}
private GenericData.Record convertNested(GenericData.Record inputRecord) {
GenericData.Record result = new GenericData.Record(
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
result.put("l1", inputRecord.get("l1"));
result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
if (inputRecord.get("parent") != null) {
// output schema doesn't have parent name.
result.put("parentId",
((GenericData.Record) inputRecord.get("parent")).get("id"));
}
return result;
}
private GenericData.Record dataNested(long id, String companyName, Long parentId,
String parentName) {
GenericData.Record result = new GenericData.Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
result.put("l1", id);
result.put("s1", companyName);
if (parentId != null || parentName != null) {
GenericData.Record parent = new GenericData.Record(
TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
parent.put("id", parentId);
parent.put("name", parentName);
result.put("parent", parent);
}
return result;
}
}