mirror of https://github.com/apache/nifi.git
NIFI-751 Add Processor To Convert Avro Formats
Implemented a new NiFi processor that allows avro records to be converted from one Avro schema to another. This supports.. * Flattening records using . notation like "parent.id" * Simple type conversions to String or base primitive types. * Specifying field renames using dynamic properties. Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
8bd20510ee
commit
bb64e70e6f
|
@ -0,0 +1,320 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.kite;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Scanner;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.Schema.Field;
|
||||||
|
import org.apache.avro.generic.GenericData.Record;
|
||||||
|
import org.apache.avro.generic.IndexedRecord;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for converting records of one Avro type to another. Supports
|
||||||
|
* syntax like "record.field" to unpack fields and will try to do simple type
|
||||||
|
* conversion.
|
||||||
|
*/
|
||||||
|
public class AvroRecordConverter {
|
||||||
|
private final Schema inputSchema;
|
||||||
|
private final Schema outputSchema;
|
||||||
|
// Store this from output field to input field so we can look up by output.
|
||||||
|
private final Map<String, String> fieldMapping;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param inputSchema
|
||||||
|
* Schema of input record objects
|
||||||
|
* @param outputSchema
|
||||||
|
* Schema of output record objects
|
||||||
|
* @param fieldMapping
|
||||||
|
* Map from field name in input record to field name in output
|
||||||
|
* record.
|
||||||
|
*/
|
||||||
|
public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
|
||||||
|
Map<String, String> fieldMapping) {
|
||||||
|
this.inputSchema = inputSchema;
|
||||||
|
this.outputSchema = outputSchema;
|
||||||
|
// Need to reverse this map.
|
||||||
|
this.fieldMapping = Maps
|
||||||
|
.newHashMapWithExpectedSize(fieldMapping.size());
|
||||||
|
for (Map.Entry<String, String> entry : fieldMapping.entrySet()) {
|
||||||
|
this.fieldMapping.put(entry.getValue(), entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Any fields in the output schema that are not mapped or are mapped
|
||||||
|
* by a non-existent input field.
|
||||||
|
*/
|
||||||
|
public Collection<String> getUnmappedFields() {
|
||||||
|
List<String> result = Lists.newArrayList();
|
||||||
|
for (Field f : outputSchema.getFields()) {
|
||||||
|
String fieldName = f.name();
|
||||||
|
if (fieldMapping.containsKey(fieldName)) {
|
||||||
|
fieldName = fieldMapping.get(fieldName);
|
||||||
|
}
|
||||||
|
|
||||||
|
Schema currentSchema = inputSchema;
|
||||||
|
while (fieldName.contains(".")) {
|
||||||
|
// Recurse down the schema to find the right field.
|
||||||
|
int dotIndex = fieldName.indexOf('.');
|
||||||
|
String entityName = fieldName.substring(0, dotIndex);
|
||||||
|
// Get the schema. In case we had an optional record, choose
|
||||||
|
// just the record.
|
||||||
|
currentSchema = getNonNullSchema(currentSchema);
|
||||||
|
if (currentSchema.getField(entityName) == null) {
|
||||||
|
// Tried to step into a schema that doesn't exist. Break out
|
||||||
|
// of the loop
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
currentSchema = currentSchema.getField(entityName).schema();
|
||||||
|
fieldName = fieldName.substring(dotIndex + 1);
|
||||||
|
}
|
||||||
|
if (currentSchema == null
|
||||||
|
|| getNonNullSchema(currentSchema).getField(fieldName) == null) {
|
||||||
|
result.add(f.name());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts one record to another given a input and output schema plus
|
||||||
|
* explicit mappings for certain target fields.
|
||||||
|
*
|
||||||
|
* @param input
|
||||||
|
* Input record to convert conforming to the inputSchema this
|
||||||
|
* converter was created with.
|
||||||
|
* @return Record converted to the outputSchema this converter was created
|
||||||
|
* with.
|
||||||
|
* @throws AvroConversionException
|
||||||
|
* When schemas do not match or illegal conversions are
|
||||||
|
* attempted, such as when numeric data fails to parse.
|
||||||
|
*/
|
||||||
|
public Record convert(Record input) throws AvroConversionException {
|
||||||
|
Record result = new Record(outputSchema);
|
||||||
|
for (Field outputField : outputSchema.getFields()) {
|
||||||
|
// Default to matching by name
|
||||||
|
String inputFieldName = outputField.name();
|
||||||
|
if (fieldMapping.containsKey(outputField.name())) {
|
||||||
|
inputFieldName = fieldMapping.get(outputField.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexedRecord currentRecord = input;
|
||||||
|
Schema currentSchema = getNonNullSchema(inputSchema);
|
||||||
|
while (inputFieldName.contains(".")) {
|
||||||
|
// Recurse down the schema to find the right field.
|
||||||
|
int dotIndex = inputFieldName.indexOf('.');
|
||||||
|
String entityName = inputFieldName.substring(0, dotIndex);
|
||||||
|
// Get the record object
|
||||||
|
Object innerRecord = currentRecord.get(currentSchema.getField(
|
||||||
|
entityName).pos());
|
||||||
|
if (innerRecord == null) {
|
||||||
|
// Probably hit a null record here. Just break out of the
|
||||||
|
// loop so that null object will be passed to convertData
|
||||||
|
// below.
|
||||||
|
currentRecord = null;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (innerRecord != null
|
||||||
|
&& !(innerRecord instanceof IndexedRecord)) {
|
||||||
|
throw new AvroConversionException(inputFieldName
|
||||||
|
+ " stepped through a non-record");
|
||||||
|
}
|
||||||
|
currentRecord = (IndexedRecord) innerRecord;
|
||||||
|
|
||||||
|
// Get the schema. In case we had an optional record, choose
|
||||||
|
// just the record.
|
||||||
|
currentSchema = currentSchema.getField(entityName).schema();
|
||||||
|
currentSchema = getNonNullSchema(currentSchema);
|
||||||
|
inputFieldName = inputFieldName.substring(dotIndex + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Current should now be in the right place to read the record.
|
||||||
|
Field f = currentSchema.getField(inputFieldName);
|
||||||
|
if (currentRecord == null) {
|
||||||
|
// We may have stepped into a null union type and gotten a null
|
||||||
|
// result.
|
||||||
|
Schema s = null;
|
||||||
|
if (f != null) {
|
||||||
|
s = f.schema();
|
||||||
|
}
|
||||||
|
result.put(outputField.name(),
|
||||||
|
convertData(null, s, outputField.schema()));
|
||||||
|
} else {
|
||||||
|
result.put(
|
||||||
|
outputField.name(),
|
||||||
|
convertData(currentRecord.get(f.pos()), f.schema(),
|
||||||
|
outputField.schema()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Schema getInputSchema() {
|
||||||
|
return inputSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Schema getOutputSchema() {
|
||||||
|
return outputSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts the data from one schema to another. If the types are the same,
|
||||||
|
* no change will be made, but simple conversions will be attempted for
|
||||||
|
* other types.
|
||||||
|
*
|
||||||
|
* @param content
|
||||||
|
* The data to convert, generally taken from a field in an input
|
||||||
|
* Record.
|
||||||
|
* @param inputSchema
|
||||||
|
* The schema of the content object
|
||||||
|
* @param outputSchema
|
||||||
|
* The schema to convert to.
|
||||||
|
* @return The content object, converted to the output schema.
|
||||||
|
* @throws AvroConversionException
|
||||||
|
* When conversion is impossible, either because the output type
|
||||||
|
* is not supported or because numeric data failed to parse.
|
||||||
|
*/
|
||||||
|
private Object convertData(Object content, Schema inputSchema,
|
||||||
|
Schema outputSchema) throws AvroConversionException {
|
||||||
|
if (content == null) {
|
||||||
|
// No conversion can happen here.
|
||||||
|
if (supportsNull(outputSchema)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
throw new AvroConversionException("Output schema " + outputSchema
|
||||||
|
+ " does not support null");
|
||||||
|
}
|
||||||
|
|
||||||
|
Schema nonNillInput = getNonNullSchema(inputSchema);
|
||||||
|
Schema nonNillOutput = getNonNullSchema(outputSchema);
|
||||||
|
if (nonNillInput.getType().equals(nonNillOutput.getType())) {
|
||||||
|
return content;
|
||||||
|
} else {
|
||||||
|
if (nonNillOutput.getType() == Schema.Type.STRING) {
|
||||||
|
return content.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
// For the non-string cases of these, we will try to convert through
|
||||||
|
// string using Scanner to validate types. This means we could
|
||||||
|
// return questionable results when a String starts with a number
|
||||||
|
// but then contains other content
|
||||||
|
Scanner scanner = new Scanner(content.toString());
|
||||||
|
switch (nonNillOutput.getType()) {
|
||||||
|
case LONG:
|
||||||
|
if (scanner.hasNextLong()) {
|
||||||
|
return scanner.nextLong();
|
||||||
|
} else {
|
||||||
|
throw new AvroConversionException("Cannot convert "
|
||||||
|
+ content + " to long");
|
||||||
|
}
|
||||||
|
case INT:
|
||||||
|
if (scanner.hasNextInt()) {
|
||||||
|
return scanner.nextInt();
|
||||||
|
} else {
|
||||||
|
throw new AvroConversionException("Cannot convert "
|
||||||
|
+ content + " to int");
|
||||||
|
}
|
||||||
|
case DOUBLE:
|
||||||
|
if (scanner.hasNextDouble()) {
|
||||||
|
return scanner.nextDouble();
|
||||||
|
} else {
|
||||||
|
throw new AvroConversionException("Cannot convert "
|
||||||
|
+ content + " to double");
|
||||||
|
}
|
||||||
|
case FLOAT:
|
||||||
|
if (scanner.hasNextFloat()) {
|
||||||
|
return scanner.nextFloat();
|
||||||
|
} else {
|
||||||
|
throw new AvroConversionException("Cannot convert "
|
||||||
|
+ content + " to float");
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
throw new AvroConversionException("Cannot convert to type "
|
||||||
|
+ nonNillOutput.getType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If s is a union schema of some type with null, returns that type.
|
||||||
|
* Otherwise just return schema itself.
|
||||||
|
*
|
||||||
|
* Does not handle unions of schemas with anything except null and one type.
|
||||||
|
*
|
||||||
|
* @param s
|
||||||
|
* Schema to remove nillable from.
|
||||||
|
* @return The Schema of the non-null part of a the union, if the input was
|
||||||
|
* a union type. Otherwise returns the input schema.
|
||||||
|
*/
|
||||||
|
protected static Schema getNonNullSchema(Schema s) {
|
||||||
|
// Handle the case where s is a union type. Assert that this must be a
|
||||||
|
// union that only includes one non-null type.
|
||||||
|
if (s.getType() == Schema.Type.UNION) {
|
||||||
|
List<Schema> types = s.getTypes();
|
||||||
|
boolean foundOne = false;
|
||||||
|
Schema result = s;
|
||||||
|
for (Schema type : types) {
|
||||||
|
if (!type.getType().equals(Schema.Type.NULL)) {
|
||||||
|
Preconditions.checkArgument(foundOne == false,
|
||||||
|
"Cannot handle union of two non-null types");
|
||||||
|
foundOne = true;
|
||||||
|
result = type;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
} else {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static boolean supportsNull(Schema s) {
|
||||||
|
if (s.getType() == Schema.Type.NULL) {
|
||||||
|
return true;
|
||||||
|
} else if (s.getType() == Schema.Type.UNION) {
|
||||||
|
for (Schema type : s.getTypes()) {
|
||||||
|
if (type.getType() == Schema.Type.NULL) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception thrown when Avro conversion fails.
|
||||||
|
*/
|
||||||
|
public class AvroConversionException extends Exception {
|
||||||
|
public AvroConversionException(String string, IOException e) {
|
||||||
|
super(string, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AvroConversionException(String string) {
|
||||||
|
super(string);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,339 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.kite;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.file.CodecFactory;
|
||||||
|
import org.apache.avro.file.DataFileStream;
|
||||||
|
import org.apache.avro.file.DataFileWriter;
|
||||||
|
import org.apache.avro.generic.GenericData.Record;
|
||||||
|
import org.apache.avro.generic.GenericDatumReader;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.StreamCallback;
|
||||||
|
import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
|
||||||
|
import org.apache.nifi.util.LongHolder;
|
||||||
|
import org.kitesdk.data.DatasetException;
|
||||||
|
import org.kitesdk.data.DatasetIOException;
|
||||||
|
import org.kitesdk.data.SchemaNotFoundException;
|
||||||
|
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
@Tags({ "avro", "convert", "kite" })
|
||||||
|
@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions")
|
||||||
|
@DynamicProperty(name = "Field name from input schema",
|
||||||
|
value = "Field name for output schema",
|
||||||
|
description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
|
||||||
|
public class ConvertAvroSchema extends AbstractKiteProcessor {
|
||||||
|
|
||||||
|
private static final Relationship SUCCESS = new Relationship.Builder()
|
||||||
|
.name("success")
|
||||||
|
.description("Avro content that converted successfully").build();
|
||||||
|
|
||||||
|
private static final Relationship FAILURE = new Relationship.Builder()
|
||||||
|
.name("failure").description("Avro content that failed to convert")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Makes sure the output schema is a valid output schema and that all its
|
||||||
|
* fields can be mapped either automatically or are explicitly mapped.
|
||||||
|
*/
|
||||||
|
protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator() {
|
||||||
|
@Override
|
||||||
|
public ValidationResult validate(String subject, String uri,
|
||||||
|
ValidationContext context) {
|
||||||
|
Configuration conf = getConfiguration(context.getProperty(
|
||||||
|
CONF_XML_FILES).getValue());
|
||||||
|
String inputUri = context.getProperty(INPUT_SCHEMA).getValue();
|
||||||
|
String error = null;
|
||||||
|
|
||||||
|
final boolean elPresent = context
|
||||||
|
.isExpressionLanguageSupported(subject)
|
||||||
|
&& context.isExpressionLanguagePresent(uri);
|
||||||
|
if (!elPresent) {
|
||||||
|
try {
|
||||||
|
Schema outputSchema = getSchema(uri, conf);
|
||||||
|
Schema inputSchema = getSchema(inputUri, conf);
|
||||||
|
// Get the explicitly mapped fields. This is identical to
|
||||||
|
// logic in onTrigger, but ValidationContext and
|
||||||
|
// ProcessContext share no ancestor, so we cannot generalize
|
||||||
|
// the code.
|
||||||
|
Map<String, String> fieldMapping = new HashMap<>();
|
||||||
|
for (final Map.Entry<PropertyDescriptor, String> entry : context
|
||||||
|
.getProperties().entrySet()) {
|
||||||
|
if (entry.getKey().isDynamic()) {
|
||||||
|
fieldMapping.put(entry.getKey().getName(),
|
||||||
|
entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
AvroRecordConverter converter = new AvroRecordConverter(
|
||||||
|
inputSchema, outputSchema, fieldMapping);
|
||||||
|
Collection<String> unmappedFields = converter
|
||||||
|
.getUnmappedFields();
|
||||||
|
if (unmappedFields.size() > 0) {
|
||||||
|
error = "The following fields are unmapped: "
|
||||||
|
+ unmappedFields;
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (SchemaNotFoundException e) {
|
||||||
|
error = e.getMessage();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(uri)
|
||||||
|
.explanation(error).valid(error == null).build();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder()
|
||||||
|
.name("Input Schema").description("Avro Schema of Input Flowfiles")
|
||||||
|
.addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true)
|
||||||
|
.required(true).build();
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder()
|
||||||
|
.name("Output Schema")
|
||||||
|
.description("Avro Schema of Output Flowfiles")
|
||||||
|
.addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true)
|
||||||
|
.required(true).build();
|
||||||
|
|
||||||
|
private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
|
||||||
|
.<PropertyDescriptor> builder()
|
||||||
|
.addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
|
||||||
|
.add(OUTPUT_SCHEMA).build();
|
||||||
|
|
||||||
|
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
|
||||||
|
.<Relationship> builder().add(SUCCESS).add(FAILURE).build();
|
||||||
|
|
||||||
|
private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern
|
||||||
|
.compile("[A-Za-z_][A-Za-z0-9_\\.]*");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates that the input and output fields (from dynamic properties) are
|
||||||
|
* all valid avro field names including "." to step into records.
|
||||||
|
*/
|
||||||
|
protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator() {
|
||||||
|
@Override
|
||||||
|
public ValidationResult validate(final String subject,
|
||||||
|
final String value, final ValidationContext context) {
|
||||||
|
if (context.isExpressionLanguageSupported(subject)
|
||||||
|
&& context.isExpressionLanguagePresent(value)) {
|
||||||
|
return new ValidationResult.Builder().subject(subject)
|
||||||
|
.input(value)
|
||||||
|
.explanation("Expression Language Present").valid(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
String reason = "";
|
||||||
|
if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) {
|
||||||
|
reason = subject + " is not a valid Avro fieldname";
|
||||||
|
}
|
||||||
|
if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) {
|
||||||
|
reason = reason + value + " is not a valid Avro fieldname";
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ValidationResult.Builder().subject(subject).input(value)
|
||||||
|
.explanation(reason).valid(reason.equals("")).build();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
|
||||||
|
final String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.description(
|
||||||
|
"Field mapping between schemas. The property name is the field name for the input "
|
||||||
|
+ "schema, and the property value is the field name for the output schema. For fields "
|
||||||
|
+ "not listed, the processor tries to match names from the input to the output record.")
|
||||||
|
.dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return PROPERTIES;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return RELATIONSHIPS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(ProcessContext context, final ProcessSession session)
|
||||||
|
throws ProcessException {
|
||||||
|
FlowFile incomingAvro = session.get();
|
||||||
|
if (incomingAvro == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String inputSchemaProperty = context.getProperty(INPUT_SCHEMA)
|
||||||
|
.evaluateAttributeExpressions(incomingAvro).getValue();
|
||||||
|
final Schema inputSchema;
|
||||||
|
try {
|
||||||
|
inputSchema = getSchema(inputSchemaProperty,
|
||||||
|
DefaultConfiguration.get());
|
||||||
|
} catch (SchemaNotFoundException e) {
|
||||||
|
getLogger().error("Cannot find schema: " + inputSchemaProperty);
|
||||||
|
session.transfer(incomingAvro, FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA)
|
||||||
|
.evaluateAttributeExpressions(incomingAvro).getValue();
|
||||||
|
final Schema outputSchema;
|
||||||
|
try {
|
||||||
|
outputSchema = getSchema(outputSchemaProperty,
|
||||||
|
DefaultConfiguration.get());
|
||||||
|
} catch (SchemaNotFoundException e) {
|
||||||
|
getLogger().error("Cannot find schema: " + outputSchemaProperty);
|
||||||
|
session.transfer(incomingAvro, FAILURE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final Map<String, String> fieldMapping = new HashMap<>();
|
||||||
|
for (final Map.Entry<PropertyDescriptor, String> entry : context
|
||||||
|
.getProperties().entrySet()) {
|
||||||
|
if (entry.getKey().isDynamic()) {
|
||||||
|
fieldMapping.put(entry.getKey().getName(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
final AvroRecordConverter converter = new AvroRecordConverter(
|
||||||
|
inputSchema, outputSchema, fieldMapping);
|
||||||
|
|
||||||
|
final DataFileWriter<Record> writer = new DataFileWriter<>(
|
||||||
|
AvroUtil.newDatumWriter(outputSchema, Record.class));
|
||||||
|
writer.setCodec(CodecFactory.snappyCodec());
|
||||||
|
|
||||||
|
final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
|
||||||
|
AvroUtil.newDatumWriter(outputSchema, Record.class));
|
||||||
|
failureWriter.setCodec(CodecFactory.snappyCodec());
|
||||||
|
|
||||||
|
try {
|
||||||
|
final LongHolder written = new LongHolder(0L);
|
||||||
|
final FailureTracker failures = new FailureTracker();
|
||||||
|
|
||||||
|
final List<Record> badRecords = Lists.newLinkedList();
|
||||||
|
FlowFile incomingAvroCopy = session.clone(incomingAvro);
|
||||||
|
FlowFile outgoingAvro = session.write(incomingAvro,
|
||||||
|
new StreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in, OutputStream out)
|
||||||
|
throws IOException {
|
||||||
|
try (DataFileStream<Record> stream = new DataFileStream<Record>(
|
||||||
|
in, new GenericDatumReader<Record>(
|
||||||
|
converter.getInputSchema()))) {
|
||||||
|
try (DataFileWriter<Record> w = writer.create(
|
||||||
|
outputSchema, out)) {
|
||||||
|
for (Record record : stream) {
|
||||||
|
try {
|
||||||
|
Record converted = converter
|
||||||
|
.convert(record);
|
||||||
|
w.append(converted);
|
||||||
|
written.incrementAndGet();
|
||||||
|
} catch (AvroConversionException e) {
|
||||||
|
failures.add(e);
|
||||||
|
getLogger().error(
|
||||||
|
"Error converting data: "
|
||||||
|
+ e.getMessage());
|
||||||
|
badRecords.add(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
FlowFile badOutput = session.write(incomingAvroCopy,
|
||||||
|
new StreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in, OutputStream out)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
try (DataFileWriter<Record> w = failureWriter
|
||||||
|
.create(inputSchema, out)) {
|
||||||
|
for (Record record : badRecords) {
|
||||||
|
w.append(record);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
long errors = failures.count();
|
||||||
|
|
||||||
|
// update only if file transfer is successful
|
||||||
|
session.adjustCounter("Converted records", written.get(), false);
|
||||||
|
// update only if file transfer is successful
|
||||||
|
session.adjustCounter("Conversion errors", errors, false);
|
||||||
|
|
||||||
|
if (written.get() > 0L) {
|
||||||
|
session.transfer(outgoingAvro, SUCCESS);
|
||||||
|
} else {
|
||||||
|
session.remove(outgoingAvro);
|
||||||
|
|
||||||
|
if (errors == 0L) {
|
||||||
|
badOutput = session.putAttribute(badOutput, "errors",
|
||||||
|
"No incoming records");
|
||||||
|
session.transfer(badOutput, FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (errors > 0L) {
|
||||||
|
getLogger().warn(
|
||||||
|
"Failed to convert {}/{} records between Avro Schemas",
|
||||||
|
new Object[] { errors, errors + written.get() });
|
||||||
|
badOutput = session.putAttribute(badOutput, "errors",
|
||||||
|
failures.summary());
|
||||||
|
session.transfer(badOutput, FAILURE);
|
||||||
|
} else {
|
||||||
|
session.remove(badOutput);
|
||||||
|
}
|
||||||
|
} catch (ProcessException | DatasetIOException e) {
|
||||||
|
getLogger().error("Failed reading or writing", e);
|
||||||
|
session.transfer(incomingAvro, FAILURE);
|
||||||
|
} catch (DatasetException e) {
|
||||||
|
getLogger().error("Failed to read FlowFile", e);
|
||||||
|
session.transfer(incomingAvro, FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -15,3 +15,4 @@
|
||||||
org.apache.nifi.processors.kite.StoreInKiteDataset
|
org.apache.nifi.processors.kite.StoreInKiteDataset
|
||||||
org.apache.nifi.processors.kite.ConvertCSVToAvro
|
org.apache.nifi.processors.kite.ConvertCSVToAvro
|
||||||
org.apache.nifi.processors.kite.ConvertJSONToAvro
|
org.apache.nifi.processors.kite.ConvertJSONToAvro
|
||||||
|
org.apache.nifi.processors.kite.ConvertAvroSchema
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<head>
|
||||||
|
<meta charset="utf-8" />
|
||||||
|
<title>ConvertAvroSchema</title>
|
||||||
|
|
||||||
|
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
|
||||||
|
</head>
|
||||||
|
|
||||||
|
<body>
|
||||||
|
<!-- Processor Documentation ================================================== -->
|
||||||
|
<h2>Description:</h2>
|
||||||
|
<p>This processor is used to convert data between two Avro formats, such as those coming from the <code>ConvertCSVToAvro</code> or
|
||||||
|
<code>ConvertJSONToAvro</code> processors. The input and output content of the flow files should be Avro data files. The processor
|
||||||
|
includes support for the following basic type conversions:
|
||||||
|
<ul>
|
||||||
|
<li>Anything to String, using the data's default String representation</li>
|
||||||
|
<li>String types to numeric types int, long, double, and float</li>
|
||||||
|
<li>Conversion to and from optional Avro types</li>
|
||||||
|
</ul>
|
||||||
|
In addition, fields can be renamed or unpacked from a record type by using the dynamic properties.
|
||||||
|
</p>
|
||||||
|
<h2>Mapping Example:</h2>
|
||||||
|
<p>
|
||||||
|
Throughout this example, we will refer to input data with the following schema:
|
||||||
|
<pre>
|
||||||
|
{
|
||||||
|
"type": "record",
|
||||||
|
"name": "CustomerInput",
|
||||||
|
"namespace": "org.apache.example",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "id",
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "companyName",
|
||||||
|
"type": ["null", "string"],
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "revenue",
|
||||||
|
"type": ["null", "string"],
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name" : "parent",
|
||||||
|
"type" : [ "null", {
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "parent",
|
||||||
|
"fields" : [ {
|
||||||
|
"name" : "name",
|
||||||
|
"type" : ["null", "string"],
|
||||||
|
"default" : null
|
||||||
|
}, {
|
||||||
|
"name" : "id",
|
||||||
|
"type" : "string"
|
||||||
|
} ]
|
||||||
|
} ],
|
||||||
|
"default" : null
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
</pre>
|
||||||
|
Where even though the revenue and id fields are mapped as string, they are logically long and double respectively.
|
||||||
|
By default, fields with matching names will be mapped automatically, so the following output schema could be converted
|
||||||
|
without using dynamic properties:
|
||||||
|
<pre>
|
||||||
|
{
|
||||||
|
"type": "record",
|
||||||
|
"name": "SimpleCustomerOutput",
|
||||||
|
"namespace": "org.apache.example",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "id",
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "companyName",
|
||||||
|
"type": ["null", "string"],
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "revenue",
|
||||||
|
"type": ["null", "double"],
|
||||||
|
"default": null
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
</pre>
|
||||||
|
To rename companyName to name and to extract the parent's id field, both a schema and a dynamic properties must be provided.
|
||||||
|
For example, to convert to the following schema:
|
||||||
|
<pre>
|
||||||
|
{
|
||||||
|
"type": "record",
|
||||||
|
"name": "SimpleCustomerOutput",
|
||||||
|
"namespace": "org.apache.example",
|
||||||
|
"fields": [
|
||||||
|
{
|
||||||
|
"name": "id",
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "name",
|
||||||
|
"type": ["null", "string"],
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "revenue",
|
||||||
|
"type": ["null", "double"],
|
||||||
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "parentId",
|
||||||
|
"type": ["null", "long"],
|
||||||
|
"default": null
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
</pre>
|
||||||
|
The following dynamic properties would be used:
|
||||||
|
<pre>
|
||||||
|
"companyName" -> "name"
|
||||||
|
"parent.id" -> "parentId"
|
||||||
|
</pre>
|
||||||
|
</p>
|
||||||
|
</body>
|
||||||
|
</html>
|
|
@ -0,0 +1,201 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.kite;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.SchemaBuilder;
|
||||||
|
import org.apache.avro.generic.GenericData.Record;
|
||||||
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
public class TestAvroRecordConverter {
|
||||||
|
final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||||
|
final static Map<String, String> EMPTY_MAPPING = ImmutableMap.of();
|
||||||
|
final static String NESTED_RECORD_SCHEMA_STRING = "{\n"
|
||||||
|
+ " \"type\": \"record\",\n"
|
||||||
|
+ " \"name\": \"NestedInput\",\n"
|
||||||
|
+ " \"namespace\": \"org.apache.example\",\n"
|
||||||
|
+ " \"fields\": [\n" + " {\n"
|
||||||
|
+ " \"name\": \"l1\",\n"
|
||||||
|
+ " \"type\": \"long\"\n"
|
||||||
|
+ " },\n"
|
||||||
|
+ " {\n" + " \"name\": \"s1\",\n"
|
||||||
|
+ " \"type\": \"string\"\n"
|
||||||
|
+ " },\n"
|
||||||
|
+ " {\n"
|
||||||
|
+ " \"name\": \"parent\",\n"
|
||||||
|
+ " \"type\": [\"null\", {\n"
|
||||||
|
+ " \"type\": \"record\",\n"
|
||||||
|
+ " \"name\": \"parent\",\n"
|
||||||
|
+ " \"fields\": [\n"
|
||||||
|
+ " { \"name\": \"id\", \"type\": \"long\" },\n"
|
||||||
|
+ " { \"name\": \"name\", \"type\": \"string\" }\n"
|
||||||
|
+ " ]"
|
||||||
|
+ " } ]"
|
||||||
|
+ " }"
|
||||||
|
+ " ] }";
|
||||||
|
final static Schema NESTED_RECORD_SCHEMA = new Schema.Parser()
|
||||||
|
.parse(NESTED_RECORD_SCHEMA_STRING);
|
||||||
|
final static Schema NESTED_PARENT_SCHEMA = AvroRecordConverter
|
||||||
|
.getNonNullSchema(NESTED_RECORD_SCHEMA.getField("parent").schema());
|
||||||
|
final static Schema UNNESTED_OUTPUT_SCHEMA = SchemaBuilder.record("Output")
|
||||||
|
.namespace("org.apache.example").fields().requiredLong("l1")
|
||||||
|
.requiredLong("s1").optionalLong("parentId").endRecord();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where we don't use a mapping file and just map records by
|
||||||
|
* name.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDefaultConversion() throws Exception {
|
||||||
|
// We will convert s1 from string to long (or leave it null), ignore s2,
|
||||||
|
// convert s3 to from string to double, convert l1 from long to string,
|
||||||
|
// and leave l2 the same.
|
||||||
|
Schema input = SchemaBuilder.record("Input")
|
||||||
|
.namespace("com.cloudera.edh").fields()
|
||||||
|
.nullableString("s1", "").requiredString("s2")
|
||||||
|
.requiredString("s3").optionalLong("l1").requiredLong("l2")
|
||||||
|
.endRecord();
|
||||||
|
Schema output = SchemaBuilder.record("Output")
|
||||||
|
.namespace("com.cloudera.edh").fields().optionalLong("s1")
|
||||||
|
.optionalString("l1").requiredLong("l2").requiredDouble("s3")
|
||||||
|
.endRecord();
|
||||||
|
|
||||||
|
AvroRecordConverter converter = new AvroRecordConverter(input, output,
|
||||||
|
EMPTY_MAPPING);
|
||||||
|
|
||||||
|
Record inputRecord = new Record(input);
|
||||||
|
inputRecord.put("s1", null);
|
||||||
|
inputRecord.put("s2", "blah");
|
||||||
|
inputRecord.put("s3", "5.5");
|
||||||
|
inputRecord.put("l1", null);
|
||||||
|
inputRecord.put("l2", 5L);
|
||||||
|
Record outputRecord = converter.convert(inputRecord);
|
||||||
|
assertNull(outputRecord.get("s1"));
|
||||||
|
assertNull(outputRecord.get("l1"));
|
||||||
|
assertEquals(5L, outputRecord.get("l2"));
|
||||||
|
assertEquals(5.5, outputRecord.get("s3"));
|
||||||
|
|
||||||
|
inputRecord.put("s1", "500");
|
||||||
|
inputRecord.put("s2", "blah");
|
||||||
|
inputRecord.put("s3", "5.5e-5");
|
||||||
|
inputRecord.put("l1", 100L);
|
||||||
|
inputRecord.put("l2", 2L);
|
||||||
|
outputRecord = converter.convert(inputRecord);
|
||||||
|
assertEquals(500L, outputRecord.get("s1"));
|
||||||
|
assertEquals("100", outputRecord.get("l1"));
|
||||||
|
assertEquals(2L, outputRecord.get("l2"));
|
||||||
|
assertEquals(5.5e-5, outputRecord.get("s3"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where we want to default map one field and explicitly map
|
||||||
|
* another.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testExplicitMapping() throws Exception {
|
||||||
|
// We will convert s1 from string to long (or leave it null), ignore s2,
|
||||||
|
// convert l1 from long to string, and leave l2 the same.
|
||||||
|
Schema input = NESTED_RECORD_SCHEMA;
|
||||||
|
Schema parent = NESTED_PARENT_SCHEMA;
|
||||||
|
Schema output = UNNESTED_OUTPUT_SCHEMA;
|
||||||
|
Map<String, String> mapping = ImmutableMap.of("parent.id", "parentId");
|
||||||
|
|
||||||
|
AvroRecordConverter converter = new AvroRecordConverter(input, output,
|
||||||
|
mapping);
|
||||||
|
|
||||||
|
Record inputRecord = new Record(input);
|
||||||
|
inputRecord.put("l1", 5L);
|
||||||
|
inputRecord.put("s1", "1000");
|
||||||
|
Record parentRecord = new Record(parent);
|
||||||
|
parentRecord.put("id", 200L);
|
||||||
|
parentRecord.put("name", "parent");
|
||||||
|
inputRecord.put("parent", parentRecord);
|
||||||
|
Record outputRecord = converter.convert(inputRecord);
|
||||||
|
assertEquals(5L, outputRecord.get("l1"));
|
||||||
|
assertEquals(1000L, outputRecord.get("s1"));
|
||||||
|
assertEquals(200L, outputRecord.get("parentId"));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where we try to convert a string to a long incorrectly.
|
||||||
|
*/
|
||||||
|
@Test(expected = org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException.class)
|
||||||
|
public void testIllegalConversion() throws Exception {
|
||||||
|
// We will convert s1 from string to long (or leave it null), ignore s2,
|
||||||
|
// convert l1 from long to string, and leave l2 the same.
|
||||||
|
Schema input = SchemaBuilder.record("Input")
|
||||||
|
.namespace("com.cloudera.edh").fields()
|
||||||
|
.nullableString("s1", "").requiredString("s2")
|
||||||
|
.optionalLong("l1").requiredLong("l2").endRecord();
|
||||||
|
Schema output = SchemaBuilder.record("Output")
|
||||||
|
.namespace("com.cloudera.edh").fields().optionalLong("s1")
|
||||||
|
.optionalString("l1").requiredLong("l2").endRecord();
|
||||||
|
|
||||||
|
AvroRecordConverter converter = new AvroRecordConverter(input, output,
|
||||||
|
EMPTY_MAPPING);
|
||||||
|
|
||||||
|
Record inputRecord = new Record(input);
|
||||||
|
inputRecord.put("s1", "blah");
|
||||||
|
inputRecord.put("s2", "blah");
|
||||||
|
inputRecord.put("l1", null);
|
||||||
|
inputRecord.put("l2", 5L);
|
||||||
|
converter.convert(inputRecord);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetUnmappedFields() throws Exception {
|
||||||
|
Schema input = SchemaBuilder.record("Input")
|
||||||
|
.namespace("com.cloudera.edh").fields()
|
||||||
|
.nullableString("s1", "").requiredString("s2")
|
||||||
|
.optionalLong("l1").requiredLong("l2").endRecord();
|
||||||
|
Schema output = SchemaBuilder.record("Output")
|
||||||
|
.namespace("com.cloudera.edh").fields().optionalLong("field")
|
||||||
|
.endRecord();
|
||||||
|
|
||||||
|
// Test the case where the field isn't mapped at all.
|
||||||
|
AvroRecordConverter converter = new AvroRecordConverter(input, output,
|
||||||
|
EMPTY_MAPPING);
|
||||||
|
assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
|
||||||
|
|
||||||
|
// Test the case where we tried to map from a non-existent field.
|
||||||
|
converter = new AvroRecordConverter(input, output, ImmutableMap.of(
|
||||||
|
"nonExistentField", "field"));
|
||||||
|
assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
|
||||||
|
|
||||||
|
// Test the case where we tried to map from a non-existent record.
|
||||||
|
converter = new AvroRecordConverter(input, output, ImmutableMap.of(
|
||||||
|
"parent.nonExistentField", "field"));
|
||||||
|
assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
|
||||||
|
|
||||||
|
// Test a valid case
|
||||||
|
converter = new AvroRecordConverter(input, output, ImmutableMap.of(
|
||||||
|
"l2", "field"));
|
||||||
|
assertEquals(Collections.EMPTY_LIST, converter.getUnmappedFields());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,216 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.kite;
|
||||||
|
|
||||||
|
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.SchemaBuilder;
|
||||||
|
import org.apache.avro.file.DataFileStream;
|
||||||
|
import org.apache.avro.generic.GenericData.Record;
|
||||||
|
import org.apache.avro.generic.GenericDatumReader;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
public class TestConvertAvroSchema {
|
||||||
|
|
||||||
|
public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
|
||||||
|
.fields().requiredString("id").requiredString("primaryColor")
|
||||||
|
.optionalString("secondaryColor").optionalString("price")
|
||||||
|
.endRecord();
|
||||||
|
|
||||||
|
public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
|
||||||
|
.fields().requiredLong("id").requiredString("color")
|
||||||
|
.optionalDouble("price").endRecord();
|
||||||
|
|
||||||
|
public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
|
||||||
|
|
||||||
|
public static final String FAILURE_SUMMARY = "Cannot convert free to double";
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBasicConversion() throws IOException {
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
|
||||||
|
runner.assertNotValid();
|
||||||
|
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
|
||||||
|
INPUT_SCHEMA.toString());
|
||||||
|
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
|
||||||
|
OUTPUT_SCHEMA.toString());
|
||||||
|
runner.setProperty("primaryColor", "color");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
// Two valid rows, and one invalid because "free" is not a double.
|
||||||
|
Record goodRecord1 = dataBasic("1", "blue", null, null);
|
||||||
|
Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
|
||||||
|
Record badRecord = dataBasic("3", "red", "yellow", "free");
|
||||||
|
List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
|
||||||
|
badRecord);
|
||||||
|
|
||||||
|
runner.enqueue(streamFor(input));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
long converted = runner.getCounterValue("Converted records");
|
||||||
|
long errors = runner.getCounterValue("Conversion errors");
|
||||||
|
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
||||||
|
Assert.assertEquals("Should reject 1 rows", 1, errors);
|
||||||
|
|
||||||
|
runner.assertTransferCount("success", 1);
|
||||||
|
runner.assertTransferCount("failure", 1);
|
||||||
|
|
||||||
|
MockFlowFile incompatible = runner.getFlowFilesForRelationship(
|
||||||
|
"failure").get(0);
|
||||||
|
GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
|
||||||
|
INPUT_SCHEMA);
|
||||||
|
DataFileStream<Record> stream = new DataFileStream<Record>(
|
||||||
|
new ByteArrayInputStream(
|
||||||
|
runner.getContentAsByteArray(incompatible)), reader);
|
||||||
|
int count = 0;
|
||||||
|
for (Record r : stream) {
|
||||||
|
Assert.assertEquals(badRecord, r);
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
stream.close();
|
||||||
|
Assert.assertEquals(1, count);
|
||||||
|
Assert.assertEquals("Should accumulate error messages",
|
||||||
|
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
|
||||||
|
|
||||||
|
GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
|
||||||
|
OUTPUT_SCHEMA);
|
||||||
|
DataFileStream<Record> successStream = new DataFileStream<Record>(
|
||||||
|
new ByteArrayInputStream(runner.getContentAsByteArray(runner
|
||||||
|
.getFlowFilesForRelationship("success").get(0))),
|
||||||
|
successReader);
|
||||||
|
count = 0;
|
||||||
|
for (Record r : successStream) {
|
||||||
|
if (count == 0) {
|
||||||
|
Assert.assertEquals(convertBasic(goodRecord1), r);
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(convertBasic(goodRecord2), r);
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
successStream.close();
|
||||||
|
Assert.assertEquals(2, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNestedConversion() throws IOException {
|
||||||
|
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
|
||||||
|
runner.assertNotValid();
|
||||||
|
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
|
||||||
|
TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
|
||||||
|
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
|
||||||
|
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
|
||||||
|
runner.setProperty("parent.id", "parentId");
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
// Two valid rows
|
||||||
|
Record goodRecord1 = dataNested(1L, "200", null, null);
|
||||||
|
Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
|
||||||
|
List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2);
|
||||||
|
|
||||||
|
runner.enqueue(streamFor(input));
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
long converted = runner.getCounterValue("Converted records");
|
||||||
|
long errors = runner.getCounterValue("Conversion errors");
|
||||||
|
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
||||||
|
Assert.assertEquals("Should reject 0 rows", 0, errors);
|
||||||
|
|
||||||
|
runner.assertTransferCount("success", 1);
|
||||||
|
runner.assertTransferCount("failure", 0);
|
||||||
|
|
||||||
|
GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
|
||||||
|
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
|
||||||
|
DataFileStream<Record> successStream = new DataFileStream<Record>(
|
||||||
|
new ByteArrayInputStream(runner.getContentAsByteArray(runner
|
||||||
|
.getFlowFilesForRelationship("success").get(0))),
|
||||||
|
successReader);
|
||||||
|
int count = 0;
|
||||||
|
for (Record r : successStream) {
|
||||||
|
if (count == 0) {
|
||||||
|
Assert.assertEquals(convertNested(goodRecord1), r);
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(convertNested(goodRecord2), r);
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
successStream.close();
|
||||||
|
Assert.assertEquals(2, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Record convertBasic(Record inputRecord) {
|
||||||
|
Record result = new Record(OUTPUT_SCHEMA);
|
||||||
|
result.put("id", Long.parseLong(inputRecord.get("id").toString()));
|
||||||
|
result.put("color", inputRecord.get("primaryColor").toString());
|
||||||
|
if (inputRecord.get("price") == null) {
|
||||||
|
result.put("price", null);
|
||||||
|
} else {
|
||||||
|
result.put("price",
|
||||||
|
Double.parseDouble(inputRecord.get("price").toString()));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Record dataBasic(String id, String primaryColor,
|
||||||
|
String secondaryColor, String price) {
|
||||||
|
Record result = new Record(INPUT_SCHEMA);
|
||||||
|
result.put("id", id);
|
||||||
|
result.put("primaryColor", primaryColor);
|
||||||
|
result.put("secondaryColor", secondaryColor);
|
||||||
|
result.put("price", price);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Record convertNested(Record inputRecord) {
|
||||||
|
Record result = new Record(
|
||||||
|
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
|
||||||
|
result.put("l1", inputRecord.get("l1"));
|
||||||
|
result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
|
||||||
|
if (inputRecord.get("parent") != null) {
|
||||||
|
// output schema doesn't have parent name.
|
||||||
|
result.put("parentId",
|
||||||
|
((Record) inputRecord.get("parent")).get("id"));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Record dataNested(long id, String companyName, Long parentId,
|
||||||
|
String parentName) {
|
||||||
|
Record result = new Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
|
||||||
|
result.put("l1", id);
|
||||||
|
result.put("s1", companyName);
|
||||||
|
if (parentId != null || parentName != null) {
|
||||||
|
Record parent = new Record(
|
||||||
|
TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
|
||||||
|
parent.put("id", parentId);
|
||||||
|
parent.put("name", parentName);
|
||||||
|
result.put("parent", parent);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue