diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
index 2fd2344037..62aaec85a2 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/pom.xml
@@ -130,4 +130,21 @@
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ src/test/resources/*.csv
+ src/test/resources/*.json
+ src/test/resources/*.avro
+
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
new file mode 100644
index 0000000000..ad8b7e5c88
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchema.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.Schema;
+import org.apache.commons.lang.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.kitesdk.data.spi.JsonUtil;
+import org.kitesdk.data.spi.filesystem.CSVProperties;
+import org.kitesdk.data.spi.filesystem.CSVUtil;
+
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.BufferedReader;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Tags({"kite", "avro", "infer", "schema", "csv", "json"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Examines the contents of the incoming FlowFile to infer an Avro schema. The processor will" +
+ " use the Kite SDK to make an attempt to automatically generate an Avro schema from the incoming content." +
+ " When inferring the schema from JSON data the key names will be used in the resulting Avro schema" +
+ " definition. When inferring from CSV data a \"header definition\" must be present either as the first line of the incoming data" +
+ " or the \"header definition\" must be explicitly set in the property \"CSV Header Definition\". A \"header definition\"" +
+ " is simply a single comma separated line defining the names of each column. The \"header definition\" is" +
+ " required in order to determine the names that should be given to each field in the resulting Avro definition." +
+ " When inferring data types the higher order data type is always used if there is ambiguity." +
+ " For example when examining numerical values the type may be set to \"long\" instead of \"integer\" since a long can" +
+ " safely hold the value of any \"integer\". Only CSV and JSON content is currently supported for automatically inferring an" +
+ " Avro schema. The type of content present in the incoming FlowFile is set by using the property \"Input Content Type\"." +
+ " The property can either be explicitly set to CSV, JSON, or \"use mime.type value\" which will examine the" +
+ " value of the mime.type attribute on the incoming FlowFile to determine the type of content present.")
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "mime.type", description = "If configured by property \"Input Content Type\" will" +
+ " use this value to determine what sort of content should be inferred from the incoming FlowFile content."),
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "inferred.avro.schema", description = "If configured by \"Schema output destination\" to" +
+ " write to an attribute this will hold the resulting Avro schema from inferring the incoming FlowFile content."),
+})
+public class InferAvroSchema
+ extends AbstractKiteProcessor {
+
+ public static final String CSV_DELIMITER = ",";
+ public static final String USE_MIME_TYPE = "use mime.type value";
+ public static final String JSON_CONTENT = "json";
+ public static final String CSV_CONTENT = "csv";
+
+ public static final String AVRO_SCHEMA_ATTRIBUTE_NAME = "inferred.avro.schema";
+ public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
+ public static final String DESTINATION_CONTENT = "flowfile-content";
+ public static final String JSON_MIME_TYPE = "application/json";
+ public static final String CSV_MIME_TYPE = "text/csv";
+ public static final String AVRO_MIME_TYPE = "application/avro-binary";
+ public static final String AVRO_FILE_EXTENSION = ".avro";
+
+ public static final PropertyDescriptor SCHEMA_DESTINATION = new PropertyDescriptor.Builder()
+ .name("Schema Output Destination")
+ .description("Control if Avro schema is written as a new flowfile attribute '" + AVRO_SCHEMA_ATTRIBUTE_NAME + "' " +
+ "or written in the flowfile content. Writing to flowfile content will overwrite any " +
+ "existing flowfile content.")
+ .required(true)
+ .allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
+ .defaultValue(DESTINATION_CONTENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor INPUT_CONTENT_TYPE = new PropertyDescriptor.Builder()
+ .name("Input Content Type")
+ .description("Content Type of data present in the incoming FlowFile's content. Only \"" +
+ JSON_CONTENT + "\" or \"" + CSV_CONTENT + "\" are supported." +
+ " If this value is set to \"" + USE_MIME_TYPE + "\" the incoming Flowfile's attribute \"" + CoreAttributes.MIME_TYPE + "\"" +
+ " will be used to determine the Content Type.")
+ .allowableValues(USE_MIME_TYPE, JSON_CONTENT, CSV_CONTENT)
+ .defaultValue(USE_MIME_TYPE)
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor GET_CSV_HEADER_DEFINITION_FROM_INPUT = new PropertyDescriptor.Builder()
+ .name("Get CSV Header Definition From Data")
+ .description("This property only applies to CSV content type. If \"true\" the processor will attempt to read the CSV header definition from the" +
+ " first line of the input data.")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CSV_HEADER_DEFINITION = new PropertyDescriptor.Builder()
+ .name("CSV Header Definition")
+ .description("This property only applies to CSV content type. Comma separated string defining the column names expected in the CSV data." +
+ " EX: \"fname,lname,zip,address\". The elements present in this string should be in the same order" +
+ " as the underlying data. Setting this property will cause the value of" +
+ " \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" to be ignored instead using this value.")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+
+ public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder()
+ .name("CSV Header Line Skip Count")
+ .description("This property only applies to CSV content type. Specifies the number of lines that should be skipped when reading the CSV data." +
+ " Setting this value to 0 is equivalent to saying \"the entire contents of the file should be read\". If the" +
+ " property \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" is set then the first line of the CSV " +
+ " file will be read in and treated as the CSV header definition. Since this will remove the header line from the data" +
+ " care should be taken to make sure the value of \"CSV header Line Skip Count\" is set to 0 to ensure" +
+ " no data is skipped.")
+ .required(true)
+ .defaultValue("0")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder()
+ .name("CSV Escape String")
+ .description("This property only applies to CSV content type. String that represents an escape sequence" +
+ " in the CSV FlowFile content data.")
+ .required(true)
+ .defaultValue("\\")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder()
+ .name("CSV Quote String")
+ .description("This property only applies to CSV content type. String that represents a literal quote" +
+ " character in the CSV FlowFile content data.")
+ .required(true)
+ .defaultValue("'")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
+ .name("Avro Record Name")
+ .description("Value to be placed in the Avro record schema \"name\" field.")
+ .required(true)
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+ .name("Charset")
+ .description("Character encoding of CSV data.")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
+ .name("Pretty Avro Output")
+ .description("If true the Avro output will be formatted.")
+ .required(true)
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder()
+ .name("Number Of Records To Analyze")
+ .description("This property only applies to JSON content type. The number of JSON records that should be" +
+ " examined to determine the Avro schema. The higher the value the better chance kite has of detecting" +
+ " the appropriate type. However the default value of 10 is almost always enough.")
+ .required(true)
+ .defaultValue("10")
+ .expressionLanguageSupported(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .build();
+
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
+ .description("Successfully created Avro schema from data.").build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
+ .description("Original incoming FlowFile data").build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
+ .description("Failed to create Avro schema from data.").build();
+
+ public static final Relationship REL_UNSUPPORTED_CONTENT = new Relationship.Builder().name("unsupported content")
+ .description("The content found in the flowfile content is not of the required format.").build();
+
+ private List properties;
+ private Set relationships;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List properties = new ArrayList<>();
+ properties.add(SCHEMA_DESTINATION);
+ properties.add(INPUT_CONTENT_TYPE);
+ properties.add(CSV_HEADER_DEFINITION);
+ properties.add(GET_CSV_HEADER_DEFINITION_FROM_INPUT);
+ properties.add(HEADER_LINE_SKIP_COUNT);
+ properties.add(ESCAPE_STRING);
+ properties.add(QUOTE_STRING);
+ properties.add(PRETTY_AVRO_OUTPUT);
+ properties.add(RECORD_NAME);
+ properties.add(NUM_RECORDS_TO_ANALYZE);
+ properties.add(CHARSET);
+ this.properties = Collections.unmodifiableList(properties);
+
+ final Set relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_ORIGINAL);
+ relationships.add(REL_UNSUPPORTED_CONTENT);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final FlowFile original = session.get();
+ if (original == null) {
+ return;
+ }
+
+ try {
+
+ final AtomicReference avroSchema = new AtomicReference<>();
+ switch (context.getProperty(INPUT_CONTENT_TYPE).getValue()) {
+ case USE_MIME_TYPE:
+ avroSchema.set(inferAvroSchemaFromMimeType(original, context, session));
+ break;
+ case JSON_CONTENT:
+ avroSchema.set(inferAvroSchemaFromJSON(original, context, session));
+ break;
+ case CSV_CONTENT:
+ avroSchema.set(inferAvroSchemaFromCSV(original, context, session));
+ break;
+ default:
+ //Shouldn't be possible but just in case
+ session.transfer(original, REL_UNSUPPORTED_CONTENT);
+ break;
+ }
+
+
+ if (StringUtils.isNotEmpty(avroSchema.get())) {
+
+ String destination = context.getProperty(SCHEMA_DESTINATION).getValue();
+ FlowFile avroSchemaFF = null;
+
+ switch (destination) {
+ case DESTINATION_ATTRIBUTE:
+ avroSchemaFF = session.putAttribute(session.clone(original), AVRO_SCHEMA_ATTRIBUTE_NAME, avroSchema.get());
+ //Leaves the original CoreAttributes.MIME_TYPE in place.
+ break;
+ case DESTINATION_CONTENT:
+ avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ out.write(avroSchema.get().getBytes());
+ }
+ });
+ avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
+ break;
+ default:
+ break;
+ }
+
+ //Transfer the sessions.
+ avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.FILENAME.key(), (original.getAttribute(CoreAttributes.FILENAME.key()) + AVRO_FILE_EXTENSION));
+ session.transfer(avroSchemaFF, REL_SUCCESS);
+ session.transfer(original, REL_ORIGINAL);
+ } else {
+ //If the avroSchema is null then the content type is unknown and therefore unsupported
+ session.transfer(original, REL_UNSUPPORTED_CONTENT);
+ }
+
+ } catch (Exception ex) {
+ getLogger().error("Failed to infer Avro schema for {} due to {}", new Object[] {original, ex});
+ session.transfer(original, REL_FAILURE);
+ }
+ }
+
+
+ /**
+ * Infers the Avro schema from the input Flowfile content. To infer an Avro schema for CSV content a header line is
+ * required. You can configure the processor to pull that header line from the first line of the CSV data if it is
+ * present OR you can manually supply the desired header line as a property value.
+ *
+ * @param inputFlowFile
+ * The original input FlowFile containing the CSV content as it entered this processor.
+ *
+ * @param context
+ * ProcessContext to pull processor configurations.
+ *
+ * @param session
+ * ProcessSession to transfer FlowFiles
+ */
+ private String inferAvroSchemaFromCSV(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
+
+ //Determines the header line either from the property input or the first line of the delimited file.
+ final AtomicReference header = new AtomicReference<>();
+ final AtomicReference hasHeader = new AtomicReference<>();
+
+ if (context.getProperty(GET_CSV_HEADER_DEFINITION_FROM_INPUT).asBoolean() == Boolean.TRUE) {
+ //Read the first line of the file to get the header value.
+ session.read(inputFlowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ BufferedReader br = new BufferedReader(new InputStreamReader(in));
+ header.set(br.readLine());
+ hasHeader.set(Boolean.TRUE);
+ br.close();
+ }
+ });
+ hasHeader.set(Boolean.TRUE);
+ } else {
+ header.set(context.getProperty(CSV_HEADER_DEFINITION).evaluateAttributeExpressions(inputFlowFile).getValue());
+ hasHeader.set(Boolean.FALSE);
+ }
+
+ //Prepares the CSVProperties for kite
+ final CSVProperties props = new CSVProperties.Builder()
+ .delimiter(CSV_DELIMITER)
+ .escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions().getValue())
+ .quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions().getValue())
+ .header(header.get())
+ .hasHeader(hasHeader.get())
+ .linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions().asInteger())
+ .charset(context.getProperty(CHARSET).getValue())
+ .build();
+
+ final AtomicReference avroSchema = new AtomicReference<>();
+
+ session.read(inputFlowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ avroSchema.set(CSVUtil
+ .inferSchema(
+ context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), in, props)
+ .toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
+ }
+ });
+
+ return avroSchema.get();
+ }
+
+ /**
+ * Infers the Avro schema from the input Flowfile content.
+ *
+ * @param inputFlowFile
+ * The original input FlowFile containing the JSON content as it entered this processor.
+ *
+ * @param context
+ * ProcessContext to pull processor configurations.
+ *
+ * @param session
+ * ProcessSession to transfer FlowFiles
+ */
+ private String inferAvroSchemaFromJSON(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
+
+ final AtomicReference avroSchema = new AtomicReference<>();
+ session.read(inputFlowFile, new InputStreamCallback() {
+ @Override
+ public void process(InputStream in) throws IOException {
+ Schema as = JsonUtil.inferSchema(
+ in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(),
+ context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions().asInteger());
+ avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
+
+ }
+ });
+
+ return avroSchema.get();
+ }
+
+ /**
+ * Examines the incoming FlowFiles mime.type attribute to determine if the schema should be inferred for CSV or JSON data.
+ *
+ * @param inputFlowFile
+ * The original input FlowFile containing the content.
+ *
+ * @param context
+ * ProcessContext to pull processor configurations.
+ *
+ * @param session
+ * ProcessSession to transfer FlowFiles
+ */
+ private String inferAvroSchemaFromMimeType(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
+
+ String mimeType = inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
+ String avroSchema = "";
+
+ if (mimeType!= null) {
+ switch (mimeType) {
+ case JSON_MIME_TYPE:
+ getLogger().debug("Inferred content type as JSON from \"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
+ inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
+ avroSchema = inferAvroSchemaFromJSON(inputFlowFile, context, session);
+ break;
+ case CSV_MIME_TYPE:
+ getLogger().debug("Inferred content type as CSV from \"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
+ inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
+ avroSchema = inferAvroSchemaFromCSV(inputFlowFile, context, session);
+ break;
+ default:
+ getLogger().warn("Unable to infer Avro Schema from {} because its mime type is {}, " +
+ " which is not supported by this Processor", new Object[] {inputFlowFile, mimeType} );
+ break;
+ }
+ }
+
+ return avroSchema;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
deleted file mode 100644
index 54ebedd8a5..0000000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-
-import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.List;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.ArrayList;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.kitesdk.data.spi.filesystem.CSVProperties;
-import org.kitesdk.data.spi.filesystem.CSVUtil;
-import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
-
-
-@Tags({"kite", "csv", "avro", "infer", "schema"})
-@SeeAlso({InferAvroSchemaFromCSV.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Creates an Avro schema from a CSV file header. The header line definition can either be provided" +
- "as a property to the processor OR present in the first line of CSV in the incoming FlowFile content. If a header" +
- " property is specified for this processor no attempt will be made to use the header line that may be present" +
- " in the incoming CSV FlowFile content.")
-public class InferAvroSchemaFromCSV
- extends AbstractKiteProcessor {
-
- public static final String CSV_DELIMITER = ",";
-
- public static final PropertyDescriptor HEADER_LINE = new PropertyDescriptor.Builder()
- .name("CSV Header Line")
- .description("Comma separated string defining the column names expected in the CSV data. " +
- "EX: \"fname,lname,zip,address\"")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder()
- .name("CSV Header Line Skip Count")
- .description("Specifies the number of header lines that should be skipped when reading the CSV data. If the " +
- " first line of the CSV data is a header line and you specify the \"CSV Header Line\" property " +
- "you need to set this vlaue to 1 otherwise the header line will be treated as actual data.")
- .required(true)
- .defaultValue("0")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder()
- .name("CSV escape string")
- .description("String that represents an escape sequence in the CSV FlowFile content data.")
- .required(true)
- .defaultValue("\\")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder()
- .name("CSV quote string")
- .description("String that represents a literal quote character in the CSV FlowFile content data.")
- .required(true)
- .defaultValue("'")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
- .name("Avro Record Name")
- .description("Value to be placed in the Avro record schema \"name\" field.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("Charset")
- .description("Character encoding of CSV data.")
- .required(true)
- .defaultValue("UTF-8")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
- .name("Pretty Avro Output")
- .description("If true the Avro output will be formatted.")
- .required(true)
- .defaultValue("true")
- .allowableValues("true", "false")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
- .description("Successfully created Avro schema for CSV data.").build();
-
- public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
- .description("Original incoming FlowFile CSV data").build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("Failed to create Avro schema for CSV data.").build();
-
- private List properties;
- private Set relationships;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List properties = new ArrayList<>();
- properties.add(HEADER_LINE);
- properties.add(HEADER_LINE_SKIP_COUNT);
- properties.add(ESCAPE_STRING);
- properties.add(QUOTE_STRING);
- properties.add(PRETTY_AVRO_OUTPUT);
- properties.add(RECORD_NAME);
- properties.add(CHARSET);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- relationships.add(REL_ORIGINAL);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected List getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public Set getRelationships() {
- return relationships;
- }
-
-
- @Override
- public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
- final FlowFile original = session.get();
- if (original == null) {
- return;
- }
-
- try {
-
- //Determines the header line either from the property input or the first line of the delimited file.
- final AtomicReference header = new AtomicReference<>();
- final AtomicReference hasHeader = new AtomicReference<>();
-
- if (context.getProperty(HEADER_LINE).isSet()) {
- header.set(context.getProperty(HEADER_LINE).getValue());
- hasHeader.set(Boolean.FALSE);
- } else {
- //Read the first line of the file to get the header value.
- session.read(original, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- BufferedReader br = new BufferedReader(new InputStreamReader(in));
- header.set(br.readLine());
- hasHeader.set(Boolean.TRUE);
- br.close();
- }
- });
- }
-
- //Prepares the CSVProperties for kite
- final CSVProperties props = new CSVProperties.Builder()
- .delimiter(CSV_DELIMITER)
- .escape(context.getProperty(ESCAPE_STRING).getValue())
- .quote(context.getProperty(QUOTE_STRING).getValue())
- .header(header.get())
- .hasHeader(hasHeader.get())
- .linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).asInteger())
- .charset(context.getProperty(CHARSET).getValue())
- .build();
-
- final Set required = ImmutableSet.of();
- final AtomicReference avroSchema = new AtomicReference<>();
-
- session.read(original, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- avroSchema.set(CSVUtil
- .inferNullableSchema(
- context.getProperty(RECORD_NAME).getValue(), in, props, required)
- .toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
- }
- });
-
- FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(avroSchema.get().getBytes());
- }
- });
-
- //Transfer the sessions.
- session.transfer(original, REL_ORIGINAL);
- session.transfer(avroSchemaFF, REL_SUCCESS);
-
- } catch (Exception ex) {
- getLogger().error(ex.getMessage());
- session.transfer(original, REL_FAILURE);
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
deleted file mode 100644
index 77029ff304..0000000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import org.apache.avro.Schema;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.kitesdk.data.spi.JsonUtil;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicReference;
-
-@Tags({"kite", "json", "avro", "infer", "schema"})
-@SeeAlso({InferAvroSchemaFromJSON.class})
-@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Creates an Avro schema from JSON data. The Avro schema is inferred by examining the fields " +
- "in the JSON input. The Avro schema generated by kite will use the same names present in the incoming JSON payload")
-public class InferAvroSchemaFromJSON
- extends AbstractKiteProcessor {
-
- public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
- .name("Avro Record Name")
- .description("Value to be placed in the Avro record schema \"name\" field.")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder()
- .name("Number of records to analyze")
- .description("Number of records that should be analyzed by kite to infer the Avro schema")
- .required(true)
- .defaultValue("10")
- .addValidator(StandardValidators.INTEGER_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
- .name("Charset")
- .description("Character encoding of CSV data.")
- .required(true)
- .defaultValue("UTF-8")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
- .name("Pretty Avro Output")
- .description("If true the Avro output will be formatted.")
- .required(true)
- .defaultValue("true")
- .allowableValues("true", "false")
- .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
- .build();
-
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
- .description("Successfully created Avro schema for JSON data.").build();
-
- public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
- .description("Original incoming FlowFile JSON data").build();
-
- public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
- .description("Failed to create Avro schema for JSON data.").build();
-
- private List properties;
- private Set relationships;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List properties = new ArrayList<>();
- properties.add(CHARSET);
- properties.add(PRETTY_AVRO_OUTPUT);
- properties.add(RECORD_NAME);
- properties.add(NUM_RECORDS_TO_ANALYZE);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- relationships.add(REL_ORIGINAL);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- protected List getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public Set getRelationships() {
- return relationships;
- }
-
-
- @Override
- public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
- final FlowFile original = session.get();
- if (original == null) {
- return;
- }
-
- try {
-
- final AtomicReference avroSchema = new AtomicReference<>();
- session.read(original, new InputStreamCallback() {
- @Override
- public void process(InputStream in) throws IOException {
- Schema as = JsonUtil.inferSchema(
- in, context.getProperty(RECORD_NAME).getValue(), context.getProperty(NUM_RECORDS_TO_ANALYZE).asInteger());
- avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
-
- }
- });
-
- FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write(avroSchema.get().getBytes());
- }
- });
-
- //Transfer the FlowFiles
- session.transfer(original, REL_ORIGINAL);
- session.transfer(avroSchemaFF, REL_SUCCESS);
-
- } catch (Exception ex) {
- getLogger().error(ex.getMessage());
- session.transfer(original, REL_FAILURE);
- }
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 7a89856f24..59fbe2d9c8 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -16,5 +16,4 @@ org.apache.nifi.processors.kite.StoreInKiteDataset
org.apache.nifi.processors.kite.ConvertCSVToAvro
org.apache.nifi.processors.kite.ConvertJSONToAvro
org.apache.nifi.processors.kite.ConvertAvroSchema
-org.apache.nifi.processors.kite.InferAvroSchemaFromCSV
-org.apache.nifi.processors.kite.InferAvroSchemaFromJSON
+org.apache.nifi.processors.kite.InferAvroSchema
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
new file mode 100644
index 0000000000..3e8e70202b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchema.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.junit.Assert;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestInferAvroSchema {
+
+ private TestRunner runner = null;
+
+ @Before
+ public void setup() {
+ runner = TestRunners.newTestRunner(InferAvroSchema.class);
+
+ //Prepare the common setup.
+ runner.assertNotValid();
+
+ runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
+ runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
+ runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_CONTENT);
+ runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "0");
+ runner.setProperty(InferAvroSchema.ESCAPE_STRING, "\\");
+ runner.setProperty(InferAvroSchema.QUOTE_STRING, "'");
+ runner.setProperty(InferAvroSchema.RECORD_NAME, "com.jeremydyer.contact");
+ runner.setProperty(InferAvroSchema.CHARSET, "UTF-8");
+ runner.setProperty(InferAvroSchema.PRETTY_AVRO_OUTPUT, "true");
+ }
+
+ @Test
+ public void inferAvroSchemaFromHeaderDefinitionOfCSVFile() throws Exception {
+
+ runner.assertValid();
+
+ Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+ runner.enqueue(new File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
+
+ runner.run();
+ runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+ runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+ flowFile.assertContentEquals(new File("src/test/resources/Shapes_Header.csv.avro").toPath());
+ flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
+ }
+
+ @Test
+ public void inferAvroSchemaFromJSONFile() throws Exception {
+
+ runner.assertValid();
+
+ runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
+
+ //Purposely set to True to test that none of the JSON file is read which would cause issues.
+ runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
+ runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE);
+
+ Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
+ runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), attributes);
+
+ runner.run();
+ runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+ runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+ MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+ String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
+ String knownSchema = FileUtils.readFileToString(new File("src/test/resources/Shapes.json.avro"));
+ Assert.assertEquals(avroSchema, knownSchema);
+
+ //Since that avro schema is written to an attribute this should be teh same as the original
+ data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
+ }
+
+ @Test
+ public void inferAvroSchemaFromCSVFile() throws Exception {
+
+ runner.assertValid();
+
+ //Read in the header
+ StringWriter writer = new StringWriter();
+ IOUtils.copy((Files.newInputStream(Paths.get("src/test/resources/ShapesHeader.csv"), StandardOpenOption.READ)), writer, "UTF-8");
+ runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, writer.toString());
+ runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "false");
+
+ Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+ runner.enqueue(new File("src/test/resources/Shapes_NoHeader.csv").toPath(), attributes);
+
+ runner.run();
+ runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+ runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+ MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+ data.assertContentEquals(Paths.get("src/test/resources/Shapes_Header.csv.avro"));
+ data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
+ }
+
+ @Test
+ public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception {
+
+ final String CSV_HEADER_LINE = FileUtils.readFileToString(new File("src/test/resources/ShapesHeader.csv"));
+
+ runner.assertValid();
+
+ runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "false");
+ runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, CSV_HEADER_LINE);
+ runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "1");
+
+ runner.assertValid();
+
+ Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+ runner.enqueue((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes(), attributes);
+
+ runner.run();
+ runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
+ runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
+
+ MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
+ data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
+ }
+
+ @Test
+ public void inferSchemaFromEmptyContent() throws Exception {
+ runner.assertValid();
+
+ Map attributes = new HashMap<>();
+ attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
+ runner.enqueue("", attributes);
+
+ runner.run();
+ runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 1);
+ runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 0);
+ runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 0);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
deleted file mode 100644
index 78c4eabfe1..0000000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-public class TestInferAvroSchemaFromCSV {
-
- private final String CSV_HEADER_LINE = "fname,lname,age,zip";
-
- @Test
- public void inferSchemaFromHeaderLineOfCSV() throws Exception {
- TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
-
- runner.assertNotValid();
- runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "0");
- runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
- runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
- runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
- runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
- runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
-
- runner.assertValid();
-
- ProcessSession session = runner.getProcessSessionFactory().createSession();
- FlowFile ff = session.write(session.create(), new OutputStreamCallback() {
- @Override
- public void process(OutputStream out) throws IOException {
- out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes());
- }
- });
-
- //Enqueue the empty FlowFile
- runner.enqueue(ff);
- runner.run();
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
- }
-
- @Test
- public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception {
- TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
-
- runner.assertNotValid();
- runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE);
- runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
- runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
- runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
- runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
- runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
- runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
-
- runner.assertValid();
-
- ProcessSession session = runner.getProcessSessionFactory().createSession();
- FlowFile ff = session.write(session.create(), new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes());
- }
- });
-
- //Enqueue the empty FlowFile
- runner.enqueue(ff);
- runner.run();
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
- }
-
- @Test
- public void inferSchemaFromEmptyContent() throws Exception {
- TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
-
- runner.assertNotValid();
- runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE);
- runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
- runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
- runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
- runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
- runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
- runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
-
- runner.assertValid();
-
- ProcessSession session = runner.getProcessSessionFactory().createSession();
- FlowFile ff = session.write(session.create(), new StreamCallback() {
- @Override
- public void process(InputStream in, OutputStream out) throws IOException {
- out.write("".getBytes());
- }
- });
-
- //Enqueue the empty FlowFile
- runner.enqueue(ff);
- runner.run();
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 1);
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 0);
- runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 0);
- }
-
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
deleted file mode 100644
index 1c63ba172f..0000000000
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java
+++ /dev/null
@@ -1,215 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nifi.processors.kite;
-
-import com.google.common.collect.Lists;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.file.DataFileStream;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.List;
-
-import static org.apache.nifi.processors.kite.TestUtil.streamFor;
-
-public class TestInferAvroSchemaFromJSON {
-
- public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
- .fields().requiredString("id").requiredString("primaryColor")
- .optionalString("secondaryColor").optionalString("price")
- .endRecord();
-
- public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
- .fields().requiredLong("id").requiredString("color")
- .optionalDouble("price").endRecord();
-
- public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
-
- public static final String FAILURE_SUMMARY = "Cannot convert free to double";
-
- @Test
- public void testBasicConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
- runner.assertNotValid();
- runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
- INPUT_SCHEMA.toString());
- runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
- OUTPUT_SCHEMA.toString());
- runner.setProperty("primaryColor", "color");
- runner.assertValid();
-
- // Two valid rows, and one invalid because "free" is not a double.
- GenericData.Record goodRecord1 = dataBasic("1", "blue", null, null);
- GenericData.Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
- GenericData.Record badRecord = dataBasic("3", "red", "yellow", "free");
- List input = Lists.newArrayList(goodRecord1, goodRecord2,
- badRecord);
-
- runner.enqueue(streamFor(input));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 1 rows", 1, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 1);
-
- MockFlowFile incompatible = runner.getFlowFilesForRelationship(
- "failure").get(0);
- GenericDatumReader reader = new GenericDatumReader(
- INPUT_SCHEMA);
- DataFileStream stream = new DataFileStream(
- new ByteArrayInputStream(
- runner.getContentAsByteArray(incompatible)), reader);
- int count = 0;
- for (GenericData.Record r : stream) {
- Assert.assertEquals(badRecord, r);
- count++;
- }
- stream.close();
- Assert.assertEquals(1, count);
- Assert.assertEquals("Should accumulate error messages",
- FAILURE_SUMMARY, incompatible.getAttribute("errors"));
-
- GenericDatumReader successReader = new GenericDatumReader(
- OUTPUT_SCHEMA);
- DataFileStream successStream = new DataFileStream(
- new ByteArrayInputStream(runner.getContentAsByteArray(runner
- .getFlowFilesForRelationship("success").get(0))),
- successReader);
- count = 0;
- for (GenericData.Record r : successStream) {
- if (count == 0) {
- Assert.assertEquals(convertBasic(goodRecord1), r);
- } else {
- Assert.assertEquals(convertBasic(goodRecord2), r);
- }
- count++;
- }
- successStream.close();
- Assert.assertEquals(2, count);
- }
-
- @Test
- public void testNestedConversion() throws IOException {
- TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
- runner.assertNotValid();
- runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
- TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
- runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
- TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
- runner.setProperty("parent.id", "parentId");
- runner.assertValid();
-
- // Two valid rows
- GenericData.Record goodRecord1 = dataNested(1L, "200", null, null);
- GenericData.Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
- List input = Lists.newArrayList(goodRecord1, goodRecord2);
-
- runner.enqueue(streamFor(input));
- runner.run();
-
- long converted = runner.getCounterValue("Converted records");
- long errors = runner.getCounterValue("Conversion errors");
- Assert.assertEquals("Should convert 2 rows", 2, converted);
- Assert.assertEquals("Should reject 0 rows", 0, errors);
-
- runner.assertTransferCount("success", 1);
- runner.assertTransferCount("failure", 0);
-
- GenericDatumReader successReader = new GenericDatumReader(
- TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
- DataFileStream successStream = new DataFileStream(
- new ByteArrayInputStream(runner.getContentAsByteArray(runner
- .getFlowFilesForRelationship("success").get(0))),
- successReader);
- int count = 0;
- for (GenericData.Record r : successStream) {
- if (count == 0) {
- Assert.assertEquals(convertNested(goodRecord1), r);
- } else {
- Assert.assertEquals(convertNested(goodRecord2), r);
- }
- count++;
- }
- successStream.close();
- Assert.assertEquals(2, count);
- }
-
- private GenericData.Record convertBasic(GenericData.Record inputRecord) {
- GenericData.Record result = new GenericData.Record(OUTPUT_SCHEMA);
- result.put("id", Long.parseLong(inputRecord.get("id").toString()));
- result.put("color", inputRecord.get("primaryColor").toString());
- if (inputRecord.get("price") == null) {
- result.put("price", null);
- } else {
- result.put("price",
- Double.parseDouble(inputRecord.get("price").toString()));
- }
- return result;
- }
-
- private GenericData.Record dataBasic(String id, String primaryColor,
- String secondaryColor, String price) {
- GenericData.Record result = new GenericData.Record(INPUT_SCHEMA);
- result.put("id", id);
- result.put("primaryColor", primaryColor);
- result.put("secondaryColor", secondaryColor);
- result.put("price", price);
- return result;
- }
-
- private GenericData.Record convertNested(GenericData.Record inputRecord) {
- GenericData.Record result = new GenericData.Record(
- TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
- result.put("l1", inputRecord.get("l1"));
- result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
- if (inputRecord.get("parent") != null) {
- // output schema doesn't have parent name.
- result.put("parentId",
- ((GenericData.Record) inputRecord.get("parent")).get("id"));
- }
- return result;
- }
-
- private GenericData.Record dataNested(long id, String companyName, Long parentId,
- String parentName) {
- GenericData.Record result = new GenericData.Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
- result.put("l1", id);
- result.put("s1", companyName);
- if (parentId != null || parentName != null) {
- GenericData.Record parent = new GenericData.Record(
- TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
- parent.put("id", parentId);
- parent.put("name", parentName);
- result.put("parent", parent);
- }
- return result;
- }
-}
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
new file mode 100644
index 0000000000..cf56f5b2c7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json
@@ -0,0 +1,10 @@
+{
+ "shapes":
+ [
+ {"shape": "circle", "color": "red", "width": 100, "height": 100},
+ {"shape": "square", "color": "red", "width": 100, "height": 100},
+ {"shape": "sphere", "color": "red", "width": 100, "height": 100},
+ {"shape": "triangle", "color": "red", "width": 100, "height": 100},
+ {"shape": "rectangle", "color": "red", "width": 100, "height": 100}
+ ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
new file mode 100644
index 0000000000..e5f91e40ee
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes.json.avro
@@ -0,0 +1,34 @@
+{
+ "type" : "record",
+ "name" : "contact",
+ "namespace" : "com.jeremydyer",
+ "fields" : [ {
+ "name" : "shapes",
+ "type" : {
+ "type" : "array",
+ "items" : {
+ "type" : "record",
+ "name" : "shapes",
+ "namespace" : "",
+ "fields" : [ {
+ "name" : "shape",
+ "type" : "string",
+ "doc" : "Type inferred from '\"circle\"'"
+ }, {
+ "name" : "color",
+ "type" : "string",
+ "doc" : "Type inferred from '\"red\"'"
+ }, {
+ "name" : "width",
+ "type" : "int",
+ "doc" : "Type inferred from '100'"
+ }, {
+ "name" : "height",
+ "type" : "int",
+ "doc" : "Type inferred from '100'"
+ } ]
+ }
+ },
+ "doc" : "Type inferred from '[{\"shape\":\"circle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"square\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"sphere\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"triangle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"rectangle\",\"color\":\"red\",\"width\":100,\"height\":100}]'"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
new file mode 100644
index 0000000000..d2dbc6dead
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/ShapesHeader.csv
@@ -0,0 +1 @@
+shape,color,width,height
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
new file mode 100644
index 0000000000..5d4aeb001e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_Header.csv
@@ -0,0 +1,352 @@
+shape,color,width,height
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_NoHeader.csv b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_NoHeader.csv
new file mode 100644
index 0000000000..a3e3017aa7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_NoHeader.csv
@@ -0,0 +1,351 @@
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+square,red,100,100
+sphere,red,100,100
+triangle,red,100,100
+rectangle,red,100,100
+circle,red,100,100
+sphere,red,100,100
+circle,red,100,100
+circle,red,100,100
+triangle,red,100,100
+cone,red,100,100
+circle,red,100,100
+rectangle,red,100,100
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_header.csv.avro b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_header.csv.avro
new file mode 100644
index 0000000000..2c35ea346f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/resources/Shapes_header.csv.avro
@@ -0,0 +1,23 @@
+{
+ "type" : "record",
+ "name" : "contact",
+ "namespace" : "com.jeremydyer",
+ "doc" : "Schema generated by Kite",
+ "fields" : [ {
+ "name" : "shape",
+ "type" : "string",
+ "doc" : "Type inferred from 'circle'"
+ }, {
+ "name" : "color",
+ "type" : "string",
+ "doc" : "Type inferred from 'red'"
+ }, {
+ "name" : "width",
+ "type" : "long",
+ "doc" : "Type inferred from '100'"
+ }, {
+ "name" : "height",
+ "type" : "long",
+ "doc" : "Type inferred from '100'"
+ } ]
+}
\ No newline at end of file