diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java new file mode 100644 index 0000000000..68e6c98342 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Scanner; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.IndexedRecord; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Responsible for converting records of one Avro type to another. Supports + * syntax like "record.field" to unpack fields and will try to do simple type + * conversion. + */ +public class AvroRecordConverter { + private final Schema inputSchema; + private final Schema outputSchema; + // Store this from output field to input field so we can look up by output. + private final Map fieldMapping; + + /** + * @param inputSchema + * Schema of input record objects + * @param outputSchema + * Schema of output record objects + * @param fieldMapping + * Map from field name in input record to field name in output + * record. + */ + public AvroRecordConverter(Schema inputSchema, Schema outputSchema, + Map fieldMapping) { + this.inputSchema = inputSchema; + this.outputSchema = outputSchema; + // Need to reverse this map. + this.fieldMapping = Maps + .newHashMapWithExpectedSize(fieldMapping.size()); + for (Map.Entry entry : fieldMapping.entrySet()) { + this.fieldMapping.put(entry.getValue(), entry.getKey()); + } + } + + /** + * @return Any fields in the output schema that are not mapped or are mapped + * by a non-existent input field. + */ + public Collection getUnmappedFields() { + List result = Lists.newArrayList(); + for (Field f : outputSchema.getFields()) { + String fieldName = f.name(); + if (fieldMapping.containsKey(fieldName)) { + fieldName = fieldMapping.get(fieldName); + } + + Schema currentSchema = inputSchema; + while (fieldName.contains(".")) { + // Recurse down the schema to find the right field. + int dotIndex = fieldName.indexOf('.'); + String entityName = fieldName.substring(0, dotIndex); + // Get the schema. In case we had an optional record, choose + // just the record. + currentSchema = getNonNullSchema(currentSchema); + if (currentSchema.getField(entityName) == null) { + // Tried to step into a schema that doesn't exist. Break out + // of the loop + break; + } + currentSchema = currentSchema.getField(entityName).schema(); + fieldName = fieldName.substring(dotIndex + 1); + } + if (currentSchema == null + || getNonNullSchema(currentSchema).getField(fieldName) == null) { + result.add(f.name()); + } + } + return result; + } + + /** + * Converts one record to another given a input and output schema plus + * explicit mappings for certain target fields. + * + * @param input + * Input record to convert conforming to the inputSchema this + * converter was created with. + * @return Record converted to the outputSchema this converter was created + * with. + * @throws AvroConversionException + * When schemas do not match or illegal conversions are + * attempted, such as when numeric data fails to parse. + */ + public Record convert(Record input) throws AvroConversionException { + Record result = new Record(outputSchema); + for (Field outputField : outputSchema.getFields()) { + // Default to matching by name + String inputFieldName = outputField.name(); + if (fieldMapping.containsKey(outputField.name())) { + inputFieldName = fieldMapping.get(outputField.name()); + } + + IndexedRecord currentRecord = input; + Schema currentSchema = getNonNullSchema(inputSchema); + while (inputFieldName.contains(".")) { + // Recurse down the schema to find the right field. + int dotIndex = inputFieldName.indexOf('.'); + String entityName = inputFieldName.substring(0, dotIndex); + // Get the record object + Object innerRecord = currentRecord.get(currentSchema.getField( + entityName).pos()); + if (innerRecord == null) { + // Probably hit a null record here. Just break out of the + // loop so that null object will be passed to convertData + // below. + currentRecord = null; + break; + } + if (innerRecord != null + && !(innerRecord instanceof IndexedRecord)) { + throw new AvroConversionException(inputFieldName + + " stepped through a non-record"); + } + currentRecord = (IndexedRecord) innerRecord; + + // Get the schema. In case we had an optional record, choose + // just the record. + currentSchema = currentSchema.getField(entityName).schema(); + currentSchema = getNonNullSchema(currentSchema); + inputFieldName = inputFieldName.substring(dotIndex + 1); + } + + // Current should now be in the right place to read the record. + Field f = currentSchema.getField(inputFieldName); + if (currentRecord == null) { + // We may have stepped into a null union type and gotten a null + // result. + Schema s = null; + if (f != null) { + s = f.schema(); + } + result.put(outputField.name(), + convertData(null, s, outputField.schema())); + } else { + result.put( + outputField.name(), + convertData(currentRecord.get(f.pos()), f.schema(), + outputField.schema())); + } + } + return result; + } + + public Schema getInputSchema() { + return inputSchema; + } + + public Schema getOutputSchema() { + return outputSchema; + } + + /** + * Converts the data from one schema to another. If the types are the same, + * no change will be made, but simple conversions will be attempted for + * other types. + * + * @param content + * The data to convert, generally taken from a field in an input + * Record. + * @param inputSchema + * The schema of the content object + * @param outputSchema + * The schema to convert to. + * @return The content object, converted to the output schema. + * @throws AvroConversionException + * When conversion is impossible, either because the output type + * is not supported or because numeric data failed to parse. + */ + private Object convertData(Object content, Schema inputSchema, + Schema outputSchema) throws AvroConversionException { + if (content == null) { + // No conversion can happen here. + if (supportsNull(outputSchema)) { + return null; + } + throw new AvroConversionException("Output schema " + outputSchema + + " does not support null"); + } + + Schema nonNillInput = getNonNullSchema(inputSchema); + Schema nonNillOutput = getNonNullSchema(outputSchema); + if (nonNillInput.getType().equals(nonNillOutput.getType())) { + return content; + } else { + if (nonNillOutput.getType() == Schema.Type.STRING) { + return content.toString(); + } + + // For the non-string cases of these, we will try to convert through + // string using Scanner to validate types. This means we could + // return questionable results when a String starts with a number + // but then contains other content + Scanner scanner = new Scanner(content.toString()); + switch (nonNillOutput.getType()) { + case LONG: + if (scanner.hasNextLong()) { + return scanner.nextLong(); + } else { + throw new AvroConversionException("Cannot convert " + + content + " to long"); + } + case INT: + if (scanner.hasNextInt()) { + return scanner.nextInt(); + } else { + throw new AvroConversionException("Cannot convert " + + content + " to int"); + } + case DOUBLE: + if (scanner.hasNextDouble()) { + return scanner.nextDouble(); + } else { + throw new AvroConversionException("Cannot convert " + + content + " to double"); + } + case FLOAT: + if (scanner.hasNextFloat()) { + return scanner.nextFloat(); + } else { + throw new AvroConversionException("Cannot convert " + + content + " to float"); + } + default: + throw new AvroConversionException("Cannot convert to type " + + nonNillOutput.getType()); + } + } + } + + /** + * If s is a union schema of some type with null, returns that type. + * Otherwise just return schema itself. + * + * Does not handle unions of schemas with anything except null and one type. + * + * @param s + * Schema to remove nillable from. + * @return The Schema of the non-null part of a the union, if the input was + * a union type. Otherwise returns the input schema. + */ + protected static Schema getNonNullSchema(Schema s) { + // Handle the case where s is a union type. Assert that this must be a + // union that only includes one non-null type. + if (s.getType() == Schema.Type.UNION) { + List types = s.getTypes(); + boolean foundOne = false; + Schema result = s; + for (Schema type : types) { + if (!type.getType().equals(Schema.Type.NULL)) { + Preconditions.checkArgument(foundOne == false, + "Cannot handle union of two non-null types"); + foundOne = true; + result = type; + } + } + return result; + } else { + return s; + } + } + + protected static boolean supportsNull(Schema s) { + if (s.getType() == Schema.Type.NULL) { + return true; + } else if (s.getType() == Schema.Type.UNION) { + for (Schema type : s.getTypes()) { + if (type.getType() == Schema.Type.NULL) { + return true; + } + } + } + return false; + } + + /** + * Exception thrown when Avro conversion fails. + */ + public class AvroConversionException extends Exception { + public AvroConversionException(String string, IOException e) { + super(string, e); + } + + public AvroConversionException(String string) { + super(string); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java new file mode 100644 index 0000000000..0d9f6586b5 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException; +import org.apache.nifi.util.LongHolder; +import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.SchemaNotFoundException; +import org.kitesdk.data.spi.DefaultConfiguration; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +@Tags({ "avro", "convert", "kite" }) +@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions") +@DynamicProperty(name = "Field name from input schema", +value = "Field name for output schema", +description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id") +public class ConvertAvroSchema extends AbstractKiteProcessor { + + private static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Avro content that converted successfully").build(); + + private static final Relationship FAILURE = new Relationship.Builder() + .name("failure").description("Avro content that failed to convert") + .build(); + + /** + * Makes sure the output schema is a valid output schema and that all its + * fields can be mapped either automatically or are explicitly mapped. + */ + protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String uri, + ValidationContext context) { + Configuration conf = getConfiguration(context.getProperty( + CONF_XML_FILES).getValue()); + String inputUri = context.getProperty(INPUT_SCHEMA).getValue(); + String error = null; + + final boolean elPresent = context + .isExpressionLanguageSupported(subject) + && context.isExpressionLanguagePresent(uri); + if (!elPresent) { + try { + Schema outputSchema = getSchema(uri, conf); + Schema inputSchema = getSchema(inputUri, conf); + // Get the explicitly mapped fields. This is identical to + // logic in onTrigger, but ValidationContext and + // ProcessContext share no ancestor, so we cannot generalize + // the code. + Map fieldMapping = new HashMap<>(); + for (final Map.Entry entry : context + .getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + fieldMapping.put(entry.getKey().getName(), + entry.getValue()); + } + } + AvroRecordConverter converter = new AvroRecordConverter( + inputSchema, outputSchema, fieldMapping); + Collection unmappedFields = converter + .getUnmappedFields(); + if (unmappedFields.size() > 0) { + error = "The following fields are unmapped: " + + unmappedFields; + } + + } catch (SchemaNotFoundException e) { + error = e.getMessage(); + } + } + return new ValidationResult.Builder().subject(subject).input(uri) + .explanation(error).valid(error == null).build(); + } + }; + + @VisibleForTesting + static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder() + .name("Input Schema").description("Avro Schema of Input Flowfiles") + .addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true) + .required(true).build(); + + @VisibleForTesting + static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder() + .name("Output Schema") + .description("Avro Schema of Output Flowfiles") + .addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true) + .required(true).build(); + + private static final List PROPERTIES = ImmutableList + . builder() + .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA) + .add(OUTPUT_SCHEMA).build(); + + private static final Set RELATIONSHIPS = ImmutableSet + . builder().add(SUCCESS).add(FAILURE).build(); + + private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern + .compile("[A-Za-z_][A-Za-z0-9_\\.]*"); + + /** + * Validates that the input and output fields (from dynamic properties) are + * all valid avro field names including "." to step into records. + */ + protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(final String subject, + final String value, final ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) + && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject) + .input(value) + .explanation("Expression Language Present").valid(true) + .build(); + } + + String reason = ""; + if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) { + reason = subject + " is not a valid Avro fieldname"; + } + if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) { + reason = reason + value + " is not a valid Avro fieldname"; + } + + return new ValidationResult.Builder().subject(subject).input(value) + .explanation(reason).valid(reason.equals("")).build(); + } + }; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( + final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description( + "Field mapping between schemas. The property name is the field name for the input " + + "schema, and the property value is the field name for the output schema. For fields " + + "not listed, the processor tries to match names from the input to the output record.") + .dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build(); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile incomingAvro = session.get(); + if (incomingAvro == null) { + return; + } + + String inputSchemaProperty = context.getProperty(INPUT_SCHEMA) + .evaluateAttributeExpressions(incomingAvro).getValue(); + final Schema inputSchema; + try { + inputSchema = getSchema(inputSchemaProperty, + DefaultConfiguration.get()); + } catch (SchemaNotFoundException e) { + getLogger().error("Cannot find schema: " + inputSchemaProperty); + session.transfer(incomingAvro, FAILURE); + return; + } + String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA) + .evaluateAttributeExpressions(incomingAvro).getValue(); + final Schema outputSchema; + try { + outputSchema = getSchema(outputSchemaProperty, + DefaultConfiguration.get()); + } catch (SchemaNotFoundException e) { + getLogger().error("Cannot find schema: " + outputSchemaProperty); + session.transfer(incomingAvro, FAILURE); + return; + } + final Map fieldMapping = new HashMap<>(); + for (final Map.Entry entry : context + .getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + fieldMapping.put(entry.getKey().getName(), entry.getValue()); + } + } + final AvroRecordConverter converter = new AvroRecordConverter( + inputSchema, outputSchema, fieldMapping); + + final DataFileWriter writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(outputSchema, Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + + final DataFileWriter failureWriter = new DataFileWriter<>( + AvroUtil.newDatumWriter(outputSchema, Record.class)); + failureWriter.setCodec(CodecFactory.snappyCodec()); + + try { + final LongHolder written = new LongHolder(0L); + final FailureTracker failures = new FailureTracker(); + + final List badRecords = Lists.newLinkedList(); + FlowFile incomingAvroCopy = session.clone(incomingAvro); + FlowFile outgoingAvro = session.write(incomingAvro, + new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) + throws IOException { + try (DataFileStream stream = new DataFileStream( + in, new GenericDatumReader( + converter.getInputSchema()))) { + try (DataFileWriter w = writer.create( + outputSchema, out)) { + for (Record record : stream) { + try { + Record converted = converter + .convert(record); + w.append(converted); + written.incrementAndGet(); + } catch (AvroConversionException e) { + failures.add(e); + getLogger().error( + "Error converting data: " + + e.getMessage()); + badRecords.add(record); + } + } + } + } + } + }); + + FlowFile badOutput = session.write(incomingAvroCopy, + new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) + throws IOException { + + try (DataFileWriter w = failureWriter + .create(inputSchema, out)) { + for (Record record : badRecords) { + w.append(record); + } + } + + } + }); + + long errors = failures.count(); + + // update only if file transfer is successful + session.adjustCounter("Converted records", written.get(), false); + // update only if file transfer is successful + session.adjustCounter("Conversion errors", errors, false); + + if (written.get() > 0L) { + session.transfer(outgoingAvro, SUCCESS); + } else { + session.remove(outgoingAvro); + + if (errors == 0L) { + badOutput = session.putAttribute(badOutput, "errors", + "No incoming records"); + session.transfer(badOutput, FAILURE); + } + } + + if (errors > 0L) { + getLogger().warn( + "Failed to convert {}/{} records between Avro Schemas", + new Object[] { errors, errors + written.get() }); + badOutput = session.putAttribute(badOutput, "errors", + failures.summary()); + session.transfer(badOutput, FAILURE); + } else { + session.remove(badOutput); + } + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed reading or writing", e); + session.transfer(incomingAvro, FAILURE); + } catch (DatasetException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(incomingAvro, FAILURE); + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 6de56122a6..ea99ff6548 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -15,3 +15,4 @@ org.apache.nifi.processors.kite.StoreInKiteDataset org.apache.nifi.processors.kite.ConvertCSVToAvro org.apache.nifi.processors.kite.ConvertJSONToAvro +org.apache.nifi.processors.kite.ConvertAvroSchema diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html new file mode 100644 index 0000000000..f5d8a1deb8 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html @@ -0,0 +1,142 @@ + + + + + + ConvertAvroSchema + + + + + + +

Description:

+

This processor is used to convert data between two Avro formats, such as those coming from the ConvertCSVToAvro or + ConvertJSONToAvro processors. The input and output content of the flow files should be Avro data files. The processor + includes support for the following basic type conversions: +

    +
  • Anything to String, using the data's default String representation
  • +
  • String types to numeric types int, long, double, and float
  • +
  • Conversion to and from optional Avro types
  • +
+ In addition, fields can be renamed or unpacked from a record type by using the dynamic properties. +

+

Mapping Example:

+

+ Throughout this example, we will refer to input data with the following schema: +

+{
+    "type": "record",
+    "name": "CustomerInput",
+    "namespace": "org.apache.example",
+    "fields": [
+        {
+            "name": "id",
+            "type": "string"
+        },
+        {
+            "name": "companyName",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name": "revenue",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name" : "parent",
+            "type" : [ "null", {
+              "type" : "record",
+              "name" : "parent",
+              "fields" : [ {
+                "name" : "name",
+                "type" : ["null", "string"],
+                "default" : null
+              }, {
+                "name" : "id",
+                "type" : "string"
+              } ]
+            } ],
+            "default" : null
+        }
+    ]
+}
+             
+ Where even though the revenue and id fields are mapped as string, they are logically long and double respectively. + By default, fields with matching names will be mapped automatically, so the following output schema could be converted + without using dynamic properties: +
+{
+    "type": "record",
+    "name": "SimpleCustomerOutput",
+    "namespace": "org.apache.example",
+    "fields": [
+        {
+            "name": "id",
+            "type": "long"
+        },
+        {
+            "name": "companyName",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name": "revenue",
+            "type": ["null", "double"],
+            "default": null
+        }
+    ]
+}
+             
+ To rename companyName to name and to extract the parent's id field, both a schema and a dynamic properties must be provided. + For example, to convert to the following schema: +
+{
+    "type": "record",
+    "name": "SimpleCustomerOutput",
+    "namespace": "org.apache.example",
+    "fields": [
+        {
+            "name": "id",
+            "type": "long"
+        },
+        {
+            "name": "name",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name": "revenue",
+            "type": ["null", "double"],
+            "default": null
+        },
+        {
+            "name": "parentId",
+            "type": ["null", "long"],
+            "default": null
+        }
+    ]
+}
+             
+ The following dynamic properties would be used: +
+"companyName" -> "name"
+"parent.id" -> "parentId"
+             
+

+ + diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java new file mode 100644 index 0000000000..1a4748f1b6 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import java.util.Collections; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData.Record; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +public class TestAvroRecordConverter { + final static ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + final static Map EMPTY_MAPPING = ImmutableMap.of(); + final static String NESTED_RECORD_SCHEMA_STRING = "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"NestedInput\",\n" + + " \"namespace\": \"org.apache.example\",\n" + + " \"fields\": [\n" + " {\n" + + " \"name\": \"l1\",\n" + + " \"type\": \"long\"\n" + + " },\n" + + " {\n" + " \"name\": \"s1\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + + " \"name\": \"parent\",\n" + + " \"type\": [\"null\", {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"parent\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"long\" },\n" + + " { \"name\": \"name\", \"type\": \"string\" }\n" + + " ]" + + " } ]" + + " }" + + " ] }"; + final static Schema NESTED_RECORD_SCHEMA = new Schema.Parser() + .parse(NESTED_RECORD_SCHEMA_STRING); + final static Schema NESTED_PARENT_SCHEMA = AvroRecordConverter + .getNonNullSchema(NESTED_RECORD_SCHEMA.getField("parent").schema()); + final static Schema UNNESTED_OUTPUT_SCHEMA = SchemaBuilder.record("Output") + .namespace("org.apache.example").fields().requiredLong("l1") + .requiredLong("s1").optionalLong("parentId").endRecord(); + + /** + * Tests the case where we don't use a mapping file and just map records by + * name. + */ + @Test + public void testDefaultConversion() throws Exception { + // We will convert s1 from string to long (or leave it null), ignore s2, + // convert s3 to from string to double, convert l1 from long to string, + // and leave l2 the same. + Schema input = SchemaBuilder.record("Input") + .namespace("com.cloudera.edh").fields() + .nullableString("s1", "").requiredString("s2") + .requiredString("s3").optionalLong("l1").requiredLong("l2") + .endRecord(); + Schema output = SchemaBuilder.record("Output") + .namespace("com.cloudera.edh").fields().optionalLong("s1") + .optionalString("l1").requiredLong("l2").requiredDouble("s3") + .endRecord(); + + AvroRecordConverter converter = new AvroRecordConverter(input, output, + EMPTY_MAPPING); + + Record inputRecord = new Record(input); + inputRecord.put("s1", null); + inputRecord.put("s2", "blah"); + inputRecord.put("s3", "5.5"); + inputRecord.put("l1", null); + inputRecord.put("l2", 5L); + Record outputRecord = converter.convert(inputRecord); + assertNull(outputRecord.get("s1")); + assertNull(outputRecord.get("l1")); + assertEquals(5L, outputRecord.get("l2")); + assertEquals(5.5, outputRecord.get("s3")); + + inputRecord.put("s1", "500"); + inputRecord.put("s2", "blah"); + inputRecord.put("s3", "5.5e-5"); + inputRecord.put("l1", 100L); + inputRecord.put("l2", 2L); + outputRecord = converter.convert(inputRecord); + assertEquals(500L, outputRecord.get("s1")); + assertEquals("100", outputRecord.get("l1")); + assertEquals(2L, outputRecord.get("l2")); + assertEquals(5.5e-5, outputRecord.get("s3")); + } + + /** + * Tests the case where we want to default map one field and explicitly map + * another. + */ + @Test + public void testExplicitMapping() throws Exception { + // We will convert s1 from string to long (or leave it null), ignore s2, + // convert l1 from long to string, and leave l2 the same. + Schema input = NESTED_RECORD_SCHEMA; + Schema parent = NESTED_PARENT_SCHEMA; + Schema output = UNNESTED_OUTPUT_SCHEMA; + Map mapping = ImmutableMap.of("parent.id", "parentId"); + + AvroRecordConverter converter = new AvroRecordConverter(input, output, + mapping); + + Record inputRecord = new Record(input); + inputRecord.put("l1", 5L); + inputRecord.put("s1", "1000"); + Record parentRecord = new Record(parent); + parentRecord.put("id", 200L); + parentRecord.put("name", "parent"); + inputRecord.put("parent", parentRecord); + Record outputRecord = converter.convert(inputRecord); + assertEquals(5L, outputRecord.get("l1")); + assertEquals(1000L, outputRecord.get("s1")); + assertEquals(200L, outputRecord.get("parentId")); + } + + /** + * Tests the case where we try to convert a string to a long incorrectly. + */ + @Test(expected = org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException.class) + public void testIllegalConversion() throws Exception { + // We will convert s1 from string to long (or leave it null), ignore s2, + // convert l1 from long to string, and leave l2 the same. + Schema input = SchemaBuilder.record("Input") + .namespace("com.cloudera.edh").fields() + .nullableString("s1", "").requiredString("s2") + .optionalLong("l1").requiredLong("l2").endRecord(); + Schema output = SchemaBuilder.record("Output") + .namespace("com.cloudera.edh").fields().optionalLong("s1") + .optionalString("l1").requiredLong("l2").endRecord(); + + AvroRecordConverter converter = new AvroRecordConverter(input, output, + EMPTY_MAPPING); + + Record inputRecord = new Record(input); + inputRecord.put("s1", "blah"); + inputRecord.put("s2", "blah"); + inputRecord.put("l1", null); + inputRecord.put("l2", 5L); + converter.convert(inputRecord); + } + + @Test + public void testGetUnmappedFields() throws Exception { + Schema input = SchemaBuilder.record("Input") + .namespace("com.cloudera.edh").fields() + .nullableString("s1", "").requiredString("s2") + .optionalLong("l1").requiredLong("l2").endRecord(); + Schema output = SchemaBuilder.record("Output") + .namespace("com.cloudera.edh").fields().optionalLong("field") + .endRecord(); + + // Test the case where the field isn't mapped at all. + AvroRecordConverter converter = new AvroRecordConverter(input, output, + EMPTY_MAPPING); + assertEquals(ImmutableList.of("field"), converter.getUnmappedFields()); + + // Test the case where we tried to map from a non-existent field. + converter = new AvroRecordConverter(input, output, ImmutableMap.of( + "nonExistentField", "field")); + assertEquals(ImmutableList.of("field"), converter.getUnmappedFields()); + + // Test the case where we tried to map from a non-existent record. + converter = new AvroRecordConverter(input, output, ImmutableMap.of( + "parent.nonExistentField", "field")); + assertEquals(ImmutableList.of("field"), converter.getUnmappedFields()); + + // Test a valid case + converter = new AvroRecordConverter(input, output, ImmutableMap.of( + "l2", "field")); + assertEquals(Collections.EMPTY_LIST, converter.getUnmappedFields()); + } +} diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java new file mode 100644 index 0000000000..33f3a821f9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import static org.apache.nifi.processors.kite.TestUtil.streamFor; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TestConvertAvroSchema { + + public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest") + .fields().requiredString("id").requiredString("primaryColor") + .optionalString("secondaryColor").optionalString("price") + .endRecord(); + + public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test") + .fields().requiredLong("id").requiredString("color") + .optionalDouble("price").endRecord(); + + public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]"; + + public static final String FAILURE_SUMMARY = "Cannot convert free to double"; + + @Test + public void testBasicConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, + INPUT_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, + OUTPUT_SCHEMA.toString()); + runner.setProperty("primaryColor", "color"); + runner.assertValid(); + + // Two valid rows, and one invalid because "free" is not a double. + Record goodRecord1 = dataBasic("1", "blue", null, null); + Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5"); + Record badRecord = dataBasic("3", "red", "yellow", "free"); + List input = Lists.newArrayList(goodRecord1, goodRecord2, + badRecord); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 rows", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship( + "failure").get(0); + GenericDatumReader reader = new GenericDatumReader( + INPUT_SCHEMA); + DataFileStream stream = new DataFileStream( + new ByteArrayInputStream( + runner.getContentAsByteArray(incompatible)), reader); + int count = 0; + for (Record r : stream) { + Assert.assertEquals(badRecord, r); + count++; + } + stream.close(); + Assert.assertEquals(1, count); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + + GenericDatumReader successReader = new GenericDatumReader( + OUTPUT_SCHEMA); + DataFileStream successStream = new DataFileStream( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + count = 0; + for (Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertBasic(goodRecord1), r); + } else { + Assert.assertEquals(convertBasic(goodRecord2), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + + @Test + public void testNestedConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, + TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, + TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString()); + runner.setProperty("parent.id", "parentId"); + runner.assertValid(); + + // Two valid rows + Record goodRecord1 = dataNested(1L, "200", null, null); + Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany"); + List input = Lists.newArrayList(goodRecord1, goodRecord2); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 0 rows", 0, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + + GenericDatumReader successReader = new GenericDatumReader( + TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA); + DataFileStream successStream = new DataFileStream( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + int count = 0; + for (Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertNested(goodRecord1), r); + } else { + Assert.assertEquals(convertNested(goodRecord2), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + + private Record convertBasic(Record inputRecord) { + Record result = new Record(OUTPUT_SCHEMA); + result.put("id", Long.parseLong(inputRecord.get("id").toString())); + result.put("color", inputRecord.get("primaryColor").toString()); + if (inputRecord.get("price") == null) { + result.put("price", null); + } else { + result.put("price", + Double.parseDouble(inputRecord.get("price").toString())); + } + return result; + } + + private Record dataBasic(String id, String primaryColor, + String secondaryColor, String price) { + Record result = new Record(INPUT_SCHEMA); + result.put("id", id); + result.put("primaryColor", primaryColor); + result.put("secondaryColor", secondaryColor); + result.put("price", price); + return result; + } + + private Record convertNested(Record inputRecord) { + Record result = new Record( + TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA); + result.put("l1", inputRecord.get("l1")); + result.put("s1", Long.parseLong(inputRecord.get("s1").toString())); + if (inputRecord.get("parent") != null) { + // output schema doesn't have parent name. + result.put("parentId", + ((Record) inputRecord.get("parent")).get("id")); + } + return result; + } + + private Record dataNested(long id, String companyName, Long parentId, + String parentName) { + Record result = new Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA); + result.put("l1", id); + result.put("s1", companyName); + if (parentId != null || parentName != null) { + Record parent = new Record( + TestAvroRecordConverter.NESTED_PARENT_SCHEMA); + parent.put("id", parentId); + parent.put("name", parentName); + result.put("parent", parent); + } + return result; + } +}