From bb64e70e6fbda12a9e0388b5c2240d96d34ac6bf Mon Sep 17 00:00:00 2001
From: Alan Jackoway
Date: Tue, 7 Jul 2015 17:28:26 -0400
Subject: [PATCH] NIFI-751 Add Processor To Convert Avro Formats
Implemented a new NiFi processor that allows avro records to be converted from one Avro schema
to another. This supports..
* Flattening records using . notation like "parent.id"
* Simple type conversions to String or base primitive types.
* Specifying field renames using dynamic properties.
Signed-off-by: joewitt
---
.../processors/kite/AvroRecordConverter.java | 320 +++++++++++++++++
.../processors/kite/ConvertAvroSchema.java | 339 ++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 142 ++++++++
.../kite/TestAvroRecordConverter.java | 201 +++++++++++
.../kite/TestConvertAvroSchema.java | 216 +++++++++++
6 files changed, 1219 insertions(+)
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
create mode 100644 nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
new file mode 100644
index 0000000000..68e6c98342
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Responsible for converting records of one Avro type to another. Supports
+ * syntax like "record.field" to unpack fields and will try to do simple type
+ * conversion.
+ */
+public class AvroRecordConverter {
+ private final Schema inputSchema;
+ private final Schema outputSchema;
+ // Store this from output field to input field so we can look up by output.
+ private final Map fieldMapping;
+
+ /**
+ * @param inputSchema
+ * Schema of input record objects
+ * @param outputSchema
+ * Schema of output record objects
+ * @param fieldMapping
+ * Map from field name in input record to field name in output
+ * record.
+ */
+ public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
+ Map fieldMapping) {
+ this.inputSchema = inputSchema;
+ this.outputSchema = outputSchema;
+ // Need to reverse this map.
+ this.fieldMapping = Maps
+ .newHashMapWithExpectedSize(fieldMapping.size());
+ for (Map.Entry entry : fieldMapping.entrySet()) {
+ this.fieldMapping.put(entry.getValue(), entry.getKey());
+ }
+ }
+
+ /**
+ * @return Any fields in the output schema that are not mapped or are mapped
+ * by a non-existent input field.
+ */
+ public Collection getUnmappedFields() {
+ List result = Lists.newArrayList();
+ for (Field f : outputSchema.getFields()) {
+ String fieldName = f.name();
+ if (fieldMapping.containsKey(fieldName)) {
+ fieldName = fieldMapping.get(fieldName);
+ }
+
+ Schema currentSchema = inputSchema;
+ while (fieldName.contains(".")) {
+ // Recurse down the schema to find the right field.
+ int dotIndex = fieldName.indexOf('.');
+ String entityName = fieldName.substring(0, dotIndex);
+ // Get the schema. In case we had an optional record, choose
+ // just the record.
+ currentSchema = getNonNullSchema(currentSchema);
+ if (currentSchema.getField(entityName) == null) {
+ // Tried to step into a schema that doesn't exist. Break out
+ // of the loop
+ break;
+ }
+ currentSchema = currentSchema.getField(entityName).schema();
+ fieldName = fieldName.substring(dotIndex + 1);
+ }
+ if (currentSchema == null
+ || getNonNullSchema(currentSchema).getField(fieldName) == null) {
+ result.add(f.name());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Converts one record to another given a input and output schema plus
+ * explicit mappings for certain target fields.
+ *
+ * @param input
+ * Input record to convert conforming to the inputSchema this
+ * converter was created with.
+ * @return Record converted to the outputSchema this converter was created
+ * with.
+ * @throws AvroConversionException
+ * When schemas do not match or illegal conversions are
+ * attempted, such as when numeric data fails to parse.
+ */
+ public Record convert(Record input) throws AvroConversionException {
+ Record result = new Record(outputSchema);
+ for (Field outputField : outputSchema.getFields()) {
+ // Default to matching by name
+ String inputFieldName = outputField.name();
+ if (fieldMapping.containsKey(outputField.name())) {
+ inputFieldName = fieldMapping.get(outputField.name());
+ }
+
+ IndexedRecord currentRecord = input;
+ Schema currentSchema = getNonNullSchema(inputSchema);
+ while (inputFieldName.contains(".")) {
+ // Recurse down the schema to find the right field.
+ int dotIndex = inputFieldName.indexOf('.');
+ String entityName = inputFieldName.substring(0, dotIndex);
+ // Get the record object
+ Object innerRecord = currentRecord.get(currentSchema.getField(
+ entityName).pos());
+ if (innerRecord == null) {
+ // Probably hit a null record here. Just break out of the
+ // loop so that null object will be passed to convertData
+ // below.
+ currentRecord = null;
+ break;
+ }
+ if (innerRecord != null
+ && !(innerRecord instanceof IndexedRecord)) {
+ throw new AvroConversionException(inputFieldName
+ + " stepped through a non-record");
+ }
+ currentRecord = (IndexedRecord) innerRecord;
+
+ // Get the schema. In case we had an optional record, choose
+ // just the record.
+ currentSchema = currentSchema.getField(entityName).schema();
+ currentSchema = getNonNullSchema(currentSchema);
+ inputFieldName = inputFieldName.substring(dotIndex + 1);
+ }
+
+ // Current should now be in the right place to read the record.
+ Field f = currentSchema.getField(inputFieldName);
+ if (currentRecord == null) {
+ // We may have stepped into a null union type and gotten a null
+ // result.
+ Schema s = null;
+ if (f != null) {
+ s = f.schema();
+ }
+ result.put(outputField.name(),
+ convertData(null, s, outputField.schema()));
+ } else {
+ result.put(
+ outputField.name(),
+ convertData(currentRecord.get(f.pos()), f.schema(),
+ outputField.schema()));
+ }
+ }
+ return result;
+ }
+
+ public Schema getInputSchema() {
+ return inputSchema;
+ }
+
+ public Schema getOutputSchema() {
+ return outputSchema;
+ }
+
+ /**
+ * Converts the data from one schema to another. If the types are the same,
+ * no change will be made, but simple conversions will be attempted for
+ * other types.
+ *
+ * @param content
+ * The data to convert, generally taken from a field in an input
+ * Record.
+ * @param inputSchema
+ * The schema of the content object
+ * @param outputSchema
+ * The schema to convert to.
+ * @return The content object, converted to the output schema.
+ * @throws AvroConversionException
+ * When conversion is impossible, either because the output type
+ * is not supported or because numeric data failed to parse.
+ */
+ private Object convertData(Object content, Schema inputSchema,
+ Schema outputSchema) throws AvroConversionException {
+ if (content == null) {
+ // No conversion can happen here.
+ if (supportsNull(outputSchema)) {
+ return null;
+ }
+ throw new AvroConversionException("Output schema " + outputSchema
+ + " does not support null");
+ }
+
+ Schema nonNillInput = getNonNullSchema(inputSchema);
+ Schema nonNillOutput = getNonNullSchema(outputSchema);
+ if (nonNillInput.getType().equals(nonNillOutput.getType())) {
+ return content;
+ } else {
+ if (nonNillOutput.getType() == Schema.Type.STRING) {
+ return content.toString();
+ }
+
+ // For the non-string cases of these, we will try to convert through
+ // string using Scanner to validate types. This means we could
+ // return questionable results when a String starts with a number
+ // but then contains other content
+ Scanner scanner = new Scanner(content.toString());
+ switch (nonNillOutput.getType()) {
+ case LONG:
+ if (scanner.hasNextLong()) {
+ return scanner.nextLong();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to long");
+ }
+ case INT:
+ if (scanner.hasNextInt()) {
+ return scanner.nextInt();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to int");
+ }
+ case DOUBLE:
+ if (scanner.hasNextDouble()) {
+ return scanner.nextDouble();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to double");
+ }
+ case FLOAT:
+ if (scanner.hasNextFloat()) {
+ return scanner.nextFloat();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to float");
+ }
+ default:
+ throw new AvroConversionException("Cannot convert to type "
+ + nonNillOutput.getType());
+ }
+ }
+ }
+
+ /**
+ * If s is a union schema of some type with null, returns that type.
+ * Otherwise just return schema itself.
+ *
+ * Does not handle unions of schemas with anything except null and one type.
+ *
+ * @param s
+ * Schema to remove nillable from.
+ * @return The Schema of the non-null part of a the union, if the input was
+ * a union type. Otherwise returns the input schema.
+ */
+ protected static Schema getNonNullSchema(Schema s) {
+ // Handle the case where s is a union type. Assert that this must be a
+ // union that only includes one non-null type.
+ if (s.getType() == Schema.Type.UNION) {
+ List types = s.getTypes();
+ boolean foundOne = false;
+ Schema result = s;
+ for (Schema type : types) {
+ if (!type.getType().equals(Schema.Type.NULL)) {
+ Preconditions.checkArgument(foundOne == false,
+ "Cannot handle union of two non-null types");
+ foundOne = true;
+ result = type;
+ }
+ }
+ return result;
+ } else {
+ return s;
+ }
+ }
+
+ protected static boolean supportsNull(Schema s) {
+ if (s.getType() == Schema.Type.NULL) {
+ return true;
+ } else if (s.getType() == Schema.Type.UNION) {
+ for (Schema type : s.getTypes()) {
+ if (type.getType() == Schema.Type.NULL) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Exception thrown when Avro conversion fails.
+ */
+ public class AvroConversionException extends Exception {
+ public AvroConversionException(String string, IOException e) {
+ super(string, e);
+ }
+
+ public AvroConversionException(String string) {
+ super(string);
+ }
+ }
+}
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
new file mode 100644
index 0000000000..0d9f6586b5
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
+import org.apache.nifi.util.LongHolder;
+import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.SchemaNotFoundException;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+@Tags({ "avro", "convert", "kite" })
+@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions")
+@DynamicProperty(name = "Field name from input schema",
+value = "Field name for output schema",
+description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
+public class ConvertAvroSchema extends AbstractKiteProcessor {
+
+ private static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Avro content that converted successfully").build();
+
+ private static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure").description("Avro content that failed to convert")
+ .build();
+
+ /**
+ * Makes sure the output schema is a valid output schema and that all its
+ * fields can be mapped either automatically or are explicitly mapped.
+ */
+ protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String uri,
+ ValidationContext context) {
+ Configuration conf = getConfiguration(context.getProperty(
+ CONF_XML_FILES).getValue());
+ String inputUri = context.getProperty(INPUT_SCHEMA).getValue();
+ String error = null;
+
+ final boolean elPresent = context
+ .isExpressionLanguageSupported(subject)
+ && context.isExpressionLanguagePresent(uri);
+ if (!elPresent) {
+ try {
+ Schema outputSchema = getSchema(uri, conf);
+ Schema inputSchema = getSchema(inputUri, conf);
+ // Get the explicitly mapped fields. This is identical to
+ // logic in onTrigger, but ValidationContext and
+ // ProcessContext share no ancestor, so we cannot generalize
+ // the code.
+ Map fieldMapping = new HashMap<>();
+ for (final Map.Entry entry : context
+ .getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ fieldMapping.put(entry.getKey().getName(),
+ entry.getValue());
+ }
+ }
+ AvroRecordConverter converter = new AvroRecordConverter(
+ inputSchema, outputSchema, fieldMapping);
+ Collection unmappedFields = converter
+ .getUnmappedFields();
+ if (unmappedFields.size() > 0) {
+ error = "The following fields are unmapped: "
+ + unmappedFields;
+ }
+
+ } catch (SchemaNotFoundException e) {
+ error = e.getMessage();
+ }
+ }
+ return new ValidationResult.Builder().subject(subject).input(uri)
+ .explanation(error).valid(error == null).build();
+ }
+ };
+
+ @VisibleForTesting
+ static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder()
+ .name("Input Schema").description("Avro Schema of Input Flowfiles")
+ .addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true)
+ .required(true).build();
+
+ @VisibleForTesting
+ static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder()
+ .name("Output Schema")
+ .description("Avro Schema of Output Flowfiles")
+ .addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true)
+ .required(true).build();
+
+ private static final List PROPERTIES = ImmutableList
+ . builder()
+ .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
+ .add(OUTPUT_SCHEMA).build();
+
+ private static final Set RELATIONSHIPS = ImmutableSet
+ . builder().add(SUCCESS).add(FAILURE).build();
+
+ private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern
+ .compile("[A-Za-z_][A-Za-z0-9_\\.]*");
+
+ /**
+ * Validates that the input and output fields (from dynamic properties) are
+ * all valid avro field names including "." to step into records.
+ */
+ protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject,
+ final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject)
+ && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject)
+ .input(value)
+ .explanation("Expression Language Present").valid(true)
+ .build();
+ }
+
+ String reason = "";
+ if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) {
+ reason = subject + " is not a valid Avro fieldname";
+ }
+ if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) {
+ reason = reason + value + " is not a valid Avro fieldname";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value)
+ .explanation(reason).valid(reason.equals("")).build();
+ }
+ };
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
+ final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description(
+ "Field mapping between schemas. The property name is the field name for the input "
+ + "schema, and the property value is the field name for the output schema. For fields "
+ + "not listed, the processor tries to match names from the input to the output record.")
+ .dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build();
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, final ProcessSession session)
+ throws ProcessException {
+ FlowFile incomingAvro = session.get();
+ if (incomingAvro == null) {
+ return;
+ }
+
+ String inputSchemaProperty = context.getProperty(INPUT_SCHEMA)
+ .evaluateAttributeExpressions(incomingAvro).getValue();
+ final Schema inputSchema;
+ try {
+ inputSchema = getSchema(inputSchemaProperty,
+ DefaultConfiguration.get());
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Cannot find schema: " + inputSchemaProperty);
+ session.transfer(incomingAvro, FAILURE);
+ return;
+ }
+ String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA)
+ .evaluateAttributeExpressions(incomingAvro).getValue();
+ final Schema outputSchema;
+ try {
+ outputSchema = getSchema(outputSchemaProperty,
+ DefaultConfiguration.get());
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Cannot find schema: " + outputSchemaProperty);
+ session.transfer(incomingAvro, FAILURE);
+ return;
+ }
+ final Map fieldMapping = new HashMap<>();
+ for (final Map.Entry entry : context
+ .getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ fieldMapping.put(entry.getKey().getName(), entry.getValue());
+ }
+ }
+ final AvroRecordConverter converter = new AvroRecordConverter(
+ inputSchema, outputSchema, fieldMapping);
+
+ final DataFileWriter writer = new DataFileWriter<>(
+ AvroUtil.newDatumWriter(outputSchema, Record.class));
+ writer.setCodec(CodecFactory.snappyCodec());
+
+ final DataFileWriter failureWriter = new DataFileWriter<>(
+ AvroUtil.newDatumWriter(outputSchema, Record.class));
+ failureWriter.setCodec(CodecFactory.snappyCodec());
+
+ try {
+ final LongHolder written = new LongHolder(0L);
+ final FailureTracker failures = new FailureTracker();
+
+ final List badRecords = Lists.newLinkedList();
+ FlowFile incomingAvroCopy = session.clone(incomingAvro);
+ FlowFile outgoingAvro = session.write(incomingAvro,
+ new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out)
+ throws IOException {
+ try (DataFileStream stream = new DataFileStream(
+ in, new GenericDatumReader(
+ converter.getInputSchema()))) {
+ try (DataFileWriter w = writer.create(
+ outputSchema, out)) {
+ for (Record record : stream) {
+ try {
+ Record converted = converter
+ .convert(record);
+ w.append(converted);
+ written.incrementAndGet();
+ } catch (AvroConversionException e) {
+ failures.add(e);
+ getLogger().error(
+ "Error converting data: "
+ + e.getMessage());
+ badRecords.add(record);
+ }
+ }
+ }
+ }
+ }
+ });
+
+ FlowFile badOutput = session.write(incomingAvroCopy,
+ new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out)
+ throws IOException {
+
+ try (DataFileWriter w = failureWriter
+ .create(inputSchema, out)) {
+ for (Record record : badRecords) {
+ w.append(record);
+ }
+ }
+
+ }
+ });
+
+ long errors = failures.count();
+
+ // update only if file transfer is successful
+ session.adjustCounter("Converted records", written.get(), false);
+ // update only if file transfer is successful
+ session.adjustCounter("Conversion errors", errors, false);
+
+ if (written.get() > 0L) {
+ session.transfer(outgoingAvro, SUCCESS);
+ } else {
+ session.remove(outgoingAvro);
+
+ if (errors == 0L) {
+ badOutput = session.putAttribute(badOutput, "errors",
+ "No incoming records");
+ session.transfer(badOutput, FAILURE);
+ }
+ }
+
+ if (errors > 0L) {
+ getLogger().warn(
+ "Failed to convert {}/{} records between Avro Schemas",
+ new Object[] { errors, errors + written.get() });
+ badOutput = session.putAttribute(badOutput, "errors",
+ failures.summary());
+ session.transfer(badOutput, FAILURE);
+ } else {
+ session.remove(badOutput);
+ }
+ } catch (ProcessException | DatasetIOException e) {
+ getLogger().error("Failed reading or writing", e);
+ session.transfer(incomingAvro, FAILURE);
+ } catch (DatasetException e) {
+ getLogger().error("Failed to read FlowFile", e);
+ session.transfer(incomingAvro, FAILURE);
+ }
+ }
+}
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6de56122a6..ea99ff6548 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
org.apache.nifi.processors.kite.StoreInKiteDataset
org.apache.nifi.processors.kite.ConvertCSVToAvro
org.apache.nifi.processors.kite.ConvertJSONToAvro
+org.apache.nifi.processors.kite.ConvertAvroSchema
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
new file mode 100644
index 0000000000..f5d8a1deb8
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
@@ -0,0 +1,142 @@
+
+
+
+
+
+ ConvertAvroSchema
+
+
+
+
+
+
+
Description:
+
This processor is used to convert data between two Avro formats, such as those coming from the ConvertCSVToAvro or
+ ConvertJSONToAvro processors. The input and output content of the flow files should be Avro data files. The processor
+ includes support for the following basic type conversions:
+
+
Anything to String, using the data's default String representation
+
String types to numeric types int, long, double, and float
+
Conversion to and from optional Avro types
+
+ In addition, fields can be renamed or unpacked from a record type by using the dynamic properties.
+
+
Mapping Example:
+
+ Throughout this example, we will refer to input data with the following schema:
+
+ Where even though the revenue and id fields are mapped as string, they are logically long and double respectively.
+ By default, fields with matching names will be mapped automatically, so the following output schema could be converted
+ without using dynamic properties:
+
+ To rename companyName to name and to extract the parent's id field, both a schema and a dynamic properties must be provided.
+ For example, to convert to the following schema:
+