diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java new file mode 100644 index 0000000000..80a8b92ca6 --- /dev/null +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromCSV.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + + +import java.io.IOException; +import java.io.BufferedReader; +import java.io.OutputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Set; +import java.util.HashSet; +import java.util.Collections; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.kitesdk.data.spi.filesystem.CSVProperties; +import org.kitesdk.data.spi.filesystem.CSVUtil; +import org.kitesdk.shaded.com.google.common.collect.ImmutableSet; + +@Tags({"kite", "csv", "avro", "infer", "schema"}) +@SeeAlso({InferAvroSchemaFromCSV.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Creates an Avro schema from a CSV file header. The header line definition can either be provided" + + "as a property to the processor OR present in the first line of CSV in the incoming FlowFile content. If a header" + + " property is specified for this processor no attempt will be made to use the header line that may be present" + + " in the incoming CSV FlowFile content.") +public class InferAvroSchemaFromCSV + extends AbstractKiteProcessor { + + public static final String CSV_DELIMITER = ","; + + public static final PropertyDescriptor HEADER_LINE = new PropertyDescriptor.Builder() + .name("CSV Header Line") + .description("Comma separated string defining the column names expected in the CSV data. " + + "EX: \"fname,lname,zip,address\"") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder() + .name("CSV Header Line Skip Count") + .description("Specifies the number of header lines that should be skipped when reading the CSV data. If the " + + " first line of the CSV data is a header line and you specify the \"CSV Header Line\" property " + + "you need to set this vlaue to 1 otherwise the header line will be treated as actual data.") + .required(true) + .defaultValue("0") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder() + .name("CSV escape string") + .description("String that represents an escape sequence in the CSV FlowFile content data.") + .required(true) + .defaultValue("\\") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder() + .name("CSV quote string") + .description("String that represents a literal quote character in the CSV FlowFile content data.") + .required(true) + .defaultValue("'") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder() + .name("Avro Record Name") + .description("Value to be placed in the Avro record schema \"name\" field.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Charset") + .description("Character encoding of CSV data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder() + .name("Pretty Avro Output") + .description("If true the Avro output will be formatted.") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully created Avro schema for CSV data.").build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") + .description("Original incoming FlowFile CSV data").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to create Avro schema for CSV data.").build(); + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(HEADER_LINE); + properties.add(HEADER_LINE_SKIP_COUNT); + properties.add(ESCAPE_STRING); + properties.add(QUOTE_STRING); + properties.add(PRETTY_AVRO_OUTPUT); + properties.add(RECORD_NAME); + properties.add(CHARSET); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_ORIGINAL); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + try { + + //Determines the header line either from the property input or the first line of the delimited file. + final AtomicReference header = new AtomicReference<>(); + final AtomicReference hasHeader = new AtomicReference<>(); + + if (context.getProperty(HEADER_LINE).isSet()) { + header.set(context.getProperty(HEADER_LINE).getValue()); + hasHeader.set(Boolean.FALSE); + } else { + //Read the first line of the file to get the header value. + session.read(original, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + BufferedReader br = new BufferedReader(new InputStreamReader(in)); + header.set(br.readLine()); + hasHeader.set(Boolean.TRUE); + br.close(); + } + }); + } + + //Prepares the CSVProperties for kite + final CSVProperties props = new CSVProperties.Builder() + .delimiter(CSV_DELIMITER) + .escape(context.getProperty(ESCAPE_STRING).getValue()) + .quote(context.getProperty(QUOTE_STRING).getValue()) + .header(header.get()) + .hasHeader(hasHeader.get()) + .linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).asInteger()) + .charset(context.getProperty(CHARSET).getValue()) + .build(); + + final Set required = ImmutableSet.of(); + final AtomicReference avroSchema = new AtomicReference<>(); + + session.read(original, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + avroSchema.set(CSVUtil + .inferNullableSchema( + context.getProperty(RECORD_NAME).getValue(), in, props, required) + .toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean())); + } + }); + + FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(avroSchema.get().getBytes()); + } + }); + + //Transfer the sessions. + session.transfer(original, REL_ORIGINAL); + session.transfer(avroSchemaFF, REL_SUCCESS); + + } catch (Exception ex) { + getLogger().error(ex.getMessage()); + session.transfer(original, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java new file mode 100644 index 0000000000..77029ff304 --- /dev/null +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/InferAvroSchemaFromJSON.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import org.apache.avro.Schema; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.kitesdk.data.spi.JsonUtil; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.HashSet; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; + +@Tags({"kite", "json", "avro", "infer", "schema"}) +@SeeAlso({InferAvroSchemaFromJSON.class}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Creates an Avro schema from JSON data. The Avro schema is inferred by examining the fields " + + "in the JSON input. The Avro schema generated by kite will use the same names present in the incoming JSON payload") +public class InferAvroSchemaFromJSON + extends AbstractKiteProcessor { + + public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder() + .name("Avro Record Name") + .description("Value to be placed in the Avro record schema \"name\" field.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder() + .name("Number of records to analyze") + .description("Number of records that should be analyzed by kite to infer the Avro schema") + .required(true) + .defaultValue("10") + .addValidator(StandardValidators.INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Charset") + .description("Character encoding of CSV data.") + .required(true) + .defaultValue("UTF-8") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder() + .name("Pretty Avro Output") + .description("If true the Avro output will be formatted.") + .required(true) + .defaultValue("true") + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Successfully created Avro schema for JSON data.").build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original") + .description("Original incoming FlowFile JSON data").build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Failed to create Avro schema for JSON data.").build(); + + private List properties; + private Set relationships; + + @Override + protected void init(final ProcessorInitializationContext context) { + final List properties = new ArrayList<>(); + properties.add(CHARSET); + properties.add(PRETTY_AVRO_OUTPUT); + properties.add(RECORD_NAME); + properties.add(NUM_RECORDS_TO_ANALYZE); + this.properties = Collections.unmodifiableList(properties); + + final Set relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_ORIGINAL); + this.relationships = Collections.unmodifiableSet(relationships); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set getRelationships() { + return relationships; + } + + + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + try { + + final AtomicReference avroSchema = new AtomicReference<>(); + session.read(original, new InputStreamCallback() { + @Override + public void process(InputStream in) throws IOException { + Schema as = JsonUtil.inferSchema( + in, context.getProperty(RECORD_NAME).getValue(), context.getProperty(NUM_RECORDS_TO_ANALYZE).asInteger()); + avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean())); + + } + }); + + FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write(avroSchema.get().getBytes()); + } + }); + + //Transfer the FlowFiles + session.transfer(original, REL_ORIGINAL); + session.transfer(avroSchemaFF, REL_SUCCESS); + + } catch (Exception ex) { + getLogger().error(ex.getMessage()); + session.transfer(original, REL_FAILURE); + } + } +} diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index ea99ff6548..7a89856f24 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -16,3 +16,5 @@ org.apache.nifi.processors.kite.StoreInKiteDataset org.apache.nifi.processors.kite.ConvertCSVToAvro org.apache.nifi.processors.kite.ConvertJSONToAvro org.apache.nifi.processors.kite.ConvertAvroSchema +org.apache.nifi.processors.kite.InferAvroSchemaFromCSV +org.apache.nifi.processors.kite.InferAvroSchemaFromJSON diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java new file mode 100644 index 0000000000..78c4eabfe1 --- /dev/null +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromCSV.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; +; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class TestInferAvroSchemaFromCSV { + + private final String CSV_HEADER_LINE = "fname,lname,age,zip"; + + @Test + public void inferSchemaFromHeaderLineOfCSV() throws Exception { + TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class); + + runner.assertNotValid(); + runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "0"); + runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\"); + runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'"); + runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact"); + runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8"); + runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true"); + + runner.assertValid(); + + ProcessSession session = runner.getProcessSessionFactory().createSession(); + FlowFile ff = session.write(session.create(), new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes()); + } + }); + + //Enqueue the empty FlowFile + runner.enqueue(ff); + runner.run(); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1); + } + + @Test + public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception { + TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class); + + runner.assertNotValid(); + runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE); + runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1"); + runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\"); + runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'"); + runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact"); + runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8"); + runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true"); + + runner.assertValid(); + + ProcessSession session = runner.getProcessSessionFactory().createSession(); + FlowFile ff = session.write(session.create(), new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes()); + } + }); + + //Enqueue the empty FlowFile + runner.enqueue(ff); + runner.run(); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1); + } + + @Test + public void inferSchemaFromEmptyContent() throws Exception { + TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class); + + runner.assertNotValid(); + runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE); + runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1"); + runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\"); + runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'"); + runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact"); + runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8"); + runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true"); + + runner.assertValid(); + + ProcessSession session = runner.getProcessSessionFactory().createSession(); + FlowFile ff = session.write(session.create(), new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) throws IOException { + out.write("".getBytes()); + } + }); + + //Enqueue the empty FlowFile + runner.enqueue(ff); + runner.run(); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 1); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 0); + runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 0); + } + +} diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java new file mode 100644 index 0000000000..1c63ba172f --- /dev/null +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestInferAvroSchemaFromJSON.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import com.google.common.collect.Lists; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.List; + +import static org.apache.nifi.processors.kite.TestUtil.streamFor; + +public class TestInferAvroSchemaFromJSON { + + public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest") + .fields().requiredString("id").requiredString("primaryColor") + .optionalString("secondaryColor").optionalString("price") + .endRecord(); + + public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test") + .fields().requiredLong("id").requiredString("color") + .optionalDouble("price").endRecord(); + + public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]"; + + public static final String FAILURE_SUMMARY = "Cannot convert free to double"; + + @Test + public void testBasicConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, + INPUT_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, + OUTPUT_SCHEMA.toString()); + runner.setProperty("primaryColor", "color"); + runner.assertValid(); + + // Two valid rows, and one invalid because "free" is not a double. + GenericData.Record goodRecord1 = dataBasic("1", "blue", null, null); + GenericData.Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5"); + GenericData.Record badRecord = dataBasic("3", "red", "yellow", "free"); + List input = Lists.newArrayList(goodRecord1, goodRecord2, + badRecord); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 rows", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship( + "failure").get(0); + GenericDatumReader reader = new GenericDatumReader( + INPUT_SCHEMA); + DataFileStream stream = new DataFileStream( + new ByteArrayInputStream( + runner.getContentAsByteArray(incompatible)), reader); + int count = 0; + for (GenericData.Record r : stream) { + Assert.assertEquals(badRecord, r); + count++; + } + stream.close(); + Assert.assertEquals(1, count); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + + GenericDatumReader successReader = new GenericDatumReader( + OUTPUT_SCHEMA); + DataFileStream successStream = new DataFileStream( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + count = 0; + for (GenericData.Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertBasic(goodRecord1), r); + } else { + Assert.assertEquals(convertBasic(goodRecord2), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + + @Test + public void testNestedConversion() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, + TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, + TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString()); + runner.setProperty("parent.id", "parentId"); + runner.assertValid(); + + // Two valid rows + GenericData.Record goodRecord1 = dataNested(1L, "200", null, null); + GenericData.Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany"); + List input = Lists.newArrayList(goodRecord1, goodRecord2); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 0 rows", 0, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + + GenericDatumReader successReader = new GenericDatumReader( + TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA); + DataFileStream successStream = new DataFileStream( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + int count = 0; + for (GenericData.Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertNested(goodRecord1), r); + } else { + Assert.assertEquals(convertNested(goodRecord2), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + + private GenericData.Record convertBasic(GenericData.Record inputRecord) { + GenericData.Record result = new GenericData.Record(OUTPUT_SCHEMA); + result.put("id", Long.parseLong(inputRecord.get("id").toString())); + result.put("color", inputRecord.get("primaryColor").toString()); + if (inputRecord.get("price") == null) { + result.put("price", null); + } else { + result.put("price", + Double.parseDouble(inputRecord.get("price").toString())); + } + return result; + } + + private GenericData.Record dataBasic(String id, String primaryColor, + String secondaryColor, String price) { + GenericData.Record result = new GenericData.Record(INPUT_SCHEMA); + result.put("id", id); + result.put("primaryColor", primaryColor); + result.put("secondaryColor", secondaryColor); + result.put("price", price); + return result; + } + + private GenericData.Record convertNested(GenericData.Record inputRecord) { + GenericData.Record result = new GenericData.Record( + TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA); + result.put("l1", inputRecord.get("l1")); + result.put("s1", Long.parseLong(inputRecord.get("s1").toString())); + if (inputRecord.get("parent") != null) { + // output schema doesn't have parent name. + result.put("parentId", + ((GenericData.Record) inputRecord.get("parent")).get("id")); + } + return result; + } + + private GenericData.Record dataNested(long id, String companyName, Long parentId, + String parentName) { + GenericData.Record result = new GenericData.Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA); + result.put("l1", id); + result.put("s1", companyName); + if (parentId != null || parentName != null) { + GenericData.Record parent = new GenericData.Record( + TestAvroRecordConverter.NESTED_PARENT_SCHEMA); + parent.put("id", parentId); + parent.put("name", parentName); + result.put("parent", parent); + } + return result; + } +}