mirror of https://github.com/apache/nifi.git
NIFI-1356
This commit is contained in:
parent
dc2e8ce512
commit
7008a3054e
|
@ -130,4 +130,21 @@
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.rat</groupId>
|
||||||
|
<artifactId>apache-rat-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<excludes combine.children="append">
|
||||||
|
<exclude>src/test/resources/*.csv</exclude>
|
||||||
|
<exclude>src/test/resources/*.json</exclude>
|
||||||
|
<exclude>src/test/resources/*.avro</exclude>
|
||||||
|
</excludes>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -0,0 +1,460 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.kite;
|
||||||
|
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttribute;
|
||||||
|
import org.apache.nifi.annotation.behavior.ReadsAttributes;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||||
|
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||||
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.processor.ProcessContext;
|
||||||
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
|
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||||
|
import org.apache.nifi.processor.Relationship;
|
||||||
|
import org.apache.nifi.processor.exception.ProcessException;
|
||||||
|
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.kitesdk.data.spi.JsonUtil;
|
||||||
|
import org.kitesdk.data.spi.filesystem.CSVProperties;
|
||||||
|
import org.kitesdk.data.spi.filesystem.CSVUtil;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStreamReader;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.io.BufferedReader;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@Tags({"kite", "avro", "infer", "schema", "csv", "json"})
|
||||||
|
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
||||||
|
@CapabilityDescription("Examines the contents of the incoming FlowFile to infer an Avro schema. The processor will" +
|
||||||
|
" use the Kite SDK to make an attempt to automatically generate an Avro schema from the incoming content." +
|
||||||
|
" When inferring the schema from JSON data the key names will be used in the resulting Avro schema" +
|
||||||
|
" definition. When inferring from CSV data a \"header definition\" must be present either as the first line of the incoming data" +
|
||||||
|
" or the \"header definition\" must be explicitly set in the property \"CSV Header Definition\". A \"header definition\"" +
|
||||||
|
" is simply a single comma separated line defining the names of each column. The \"header definition\" is" +
|
||||||
|
" required in order to determine the names that should be given to each field in the resulting Avro definition." +
|
||||||
|
" When inferring data types the higher order data type is always used if there is ambiguity." +
|
||||||
|
" For example when examining numerical values the type may be set to \"long\" instead of \"integer\" since a long can" +
|
||||||
|
" safely hold the value of any \"integer\". Only CSV and JSON content is currently supported for automatically inferring an" +
|
||||||
|
" Avro schema. The type of content present in the incoming FlowFile is set by using the property \"Input Content Type\"." +
|
||||||
|
" The property can either be explicitly set to CSV, JSON, or \"use mime.type value\" which will examine the" +
|
||||||
|
" value of the mime.type attribute on the incoming FlowFile to determine the type of content present.")
|
||||||
|
@ReadsAttributes({
|
||||||
|
@ReadsAttribute(attribute = "mime.type", description = "If configured by property \"Input Content Type\" will" +
|
||||||
|
" use this value to determine what sort of content should be inferred from the incoming FlowFile content."),
|
||||||
|
})
|
||||||
|
@WritesAttributes({
|
||||||
|
@WritesAttribute(attribute = "inferred.avro.schema", description = "If configured by \"Schema output destination\" to" +
|
||||||
|
" write to an attribute this will hold the resulting Avro schema from inferring the incoming FlowFile content."),
|
||||||
|
})
|
||||||
|
public class InferAvroSchema
|
||||||
|
extends AbstractKiteProcessor {
|
||||||
|
|
||||||
|
public static final String CSV_DELIMITER = ",";
|
||||||
|
public static final String USE_MIME_TYPE = "use mime.type value";
|
||||||
|
public static final String JSON_CONTENT = "json";
|
||||||
|
public static final String CSV_CONTENT = "csv";
|
||||||
|
|
||||||
|
public static final String AVRO_SCHEMA_ATTRIBUTE_NAME = "inferred.avro.schema";
|
||||||
|
public static final String DESTINATION_ATTRIBUTE = "flowfile-attribute";
|
||||||
|
public static final String DESTINATION_CONTENT = "flowfile-content";
|
||||||
|
public static final String JSON_MIME_TYPE = "application/json";
|
||||||
|
public static final String CSV_MIME_TYPE = "text/csv";
|
||||||
|
public static final String AVRO_MIME_TYPE = "application/avro-binary";
|
||||||
|
public static final String AVRO_FILE_EXTENSION = ".avro";
|
||||||
|
|
||||||
|
public static final PropertyDescriptor SCHEMA_DESTINATION = new PropertyDescriptor.Builder()
|
||||||
|
.name("Schema Output Destination")
|
||||||
|
.description("Control if Avro schema is written as a new flowfile attribute '" + AVRO_SCHEMA_ATTRIBUTE_NAME + "' " +
|
||||||
|
"or written in the flowfile content. Writing to flowfile content will overwrite any " +
|
||||||
|
"existing flowfile content.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues(DESTINATION_ATTRIBUTE, DESTINATION_CONTENT)
|
||||||
|
.defaultValue(DESTINATION_CONTENT)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor INPUT_CONTENT_TYPE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Input Content Type")
|
||||||
|
.description("Content Type of data present in the incoming FlowFile's content. Only \"" +
|
||||||
|
JSON_CONTENT + "\" or \"" + CSV_CONTENT + "\" are supported." +
|
||||||
|
" If this value is set to \"" + USE_MIME_TYPE + "\" the incoming Flowfile's attribute \"" + CoreAttributes.MIME_TYPE + "\"" +
|
||||||
|
" will be used to determine the Content Type.")
|
||||||
|
.allowableValues(USE_MIME_TYPE, JSON_CONTENT, CSV_CONTENT)
|
||||||
|
.defaultValue(USE_MIME_TYPE)
|
||||||
|
.required(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor GET_CSV_HEADER_DEFINITION_FROM_INPUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Get CSV Header Definition From Data")
|
||||||
|
.description("This property only applies to CSV content type. If \"true\" the processor will attempt to read the CSV header definition from the" +
|
||||||
|
" first line of the input data.")
|
||||||
|
.required(true)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("true")
|
||||||
|
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor CSV_HEADER_DEFINITION = new PropertyDescriptor.Builder()
|
||||||
|
.name("CSV Header Definition")
|
||||||
|
.description("This property only applies to CSV content type. Comma separated string defining the column names expected in the CSV data." +
|
||||||
|
" EX: \"fname,lname,zip,address\". The elements present in this string should be in the same order" +
|
||||||
|
" as the underlying data. Setting this property will cause the value of" +
|
||||||
|
" \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" to be ignored instead using this value.")
|
||||||
|
.required(false)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.defaultValue(null)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder()
|
||||||
|
.name("CSV Header Line Skip Count")
|
||||||
|
.description("This property only applies to CSV content type. Specifies the number of lines that should be skipped when reading the CSV data." +
|
||||||
|
" Setting this value to 0 is equivalent to saying \"the entire contents of the file should be read\". If the" +
|
||||||
|
" property \"" + GET_CSV_HEADER_DEFINITION_FROM_INPUT.getName() + "\" is set then the first line of the CSV " +
|
||||||
|
" file will be read in and treated as the CSV header definition. Since this will remove the header line from the data" +
|
||||||
|
" care should be taken to make sure the value of \"CSV header Line Skip Count\" is set to 0 to ensure" +
|
||||||
|
" no data is skipped.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("0")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder()
|
||||||
|
.name("CSV Escape String")
|
||||||
|
.description("This property only applies to CSV content type. String that represents an escape sequence" +
|
||||||
|
" in the CSV FlowFile content data.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("\\")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder()
|
||||||
|
.name("CSV Quote String")
|
||||||
|
.description("This property only applies to CSV content type. String that represents a literal quote" +
|
||||||
|
" character in the CSV FlowFile content data.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("'")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
|
||||||
|
.name("Avro Record Name")
|
||||||
|
.description("Value to be placed in the Avro record schema \"name\" field.")
|
||||||
|
.required(true)
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
||||||
|
.name("Charset")
|
||||||
|
.description("Character encoding of CSV data.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("UTF-8")
|
||||||
|
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
|
||||||
|
.name("Pretty Avro Output")
|
||||||
|
.description("If true the Avro output will be formatted.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("true")
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder()
|
||||||
|
.name("Number Of Records To Analyze")
|
||||||
|
.description("This property only applies to JSON content type. The number of JSON records that should be" +
|
||||||
|
" examined to determine the Avro schema. The higher the value the better chance kite has of detecting" +
|
||||||
|
" the appropriate type. However the default value of 10 is almost always enough.")
|
||||||
|
.required(true)
|
||||||
|
.defaultValue("10")
|
||||||
|
.expressionLanguageSupported(true)
|
||||||
|
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
|
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
||||||
|
.description("Successfully created Avro schema from data.").build();
|
||||||
|
|
||||||
|
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
|
||||||
|
.description("Original incoming FlowFile data").build();
|
||||||
|
|
||||||
|
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
||||||
|
.description("Failed to create Avro schema from data.").build();
|
||||||
|
|
||||||
|
public static final Relationship REL_UNSUPPORTED_CONTENT = new Relationship.Builder().name("unsupported content")
|
||||||
|
.description("The content found in the flowfile content is not of the required format.").build();
|
||||||
|
|
||||||
|
private List<PropertyDescriptor> properties;
|
||||||
|
private Set<Relationship> relationships;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void init(final ProcessorInitializationContext context) {
|
||||||
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
|
properties.add(SCHEMA_DESTINATION);
|
||||||
|
properties.add(INPUT_CONTENT_TYPE);
|
||||||
|
properties.add(CSV_HEADER_DEFINITION);
|
||||||
|
properties.add(GET_CSV_HEADER_DEFINITION_FROM_INPUT);
|
||||||
|
properties.add(HEADER_LINE_SKIP_COUNT);
|
||||||
|
properties.add(ESCAPE_STRING);
|
||||||
|
properties.add(QUOTE_STRING);
|
||||||
|
properties.add(PRETTY_AVRO_OUTPUT);
|
||||||
|
properties.add(RECORD_NAME);
|
||||||
|
properties.add(NUM_RECORDS_TO_ANALYZE);
|
||||||
|
properties.add(CHARSET);
|
||||||
|
this.properties = Collections.unmodifiableList(properties);
|
||||||
|
|
||||||
|
final Set<Relationship> relationships = new HashSet<>();
|
||||||
|
relationships.add(REL_SUCCESS);
|
||||||
|
relationships.add(REL_FAILURE);
|
||||||
|
relationships.add(REL_ORIGINAL);
|
||||||
|
relationships.add(REL_UNSUPPORTED_CONTENT);
|
||||||
|
this.relationships = Collections.unmodifiableSet(relationships);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<Relationship> getRelationships() {
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
|
final FlowFile original = session.get();
|
||||||
|
if (original == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
|
||||||
|
final AtomicReference<String> avroSchema = new AtomicReference<>();
|
||||||
|
switch (context.getProperty(INPUT_CONTENT_TYPE).getValue()) {
|
||||||
|
case USE_MIME_TYPE:
|
||||||
|
avroSchema.set(inferAvroSchemaFromMimeType(original, context, session));
|
||||||
|
break;
|
||||||
|
case JSON_CONTENT:
|
||||||
|
avroSchema.set(inferAvroSchemaFromJSON(original, context, session));
|
||||||
|
break;
|
||||||
|
case CSV_CONTENT:
|
||||||
|
avroSchema.set(inferAvroSchemaFromCSV(original, context, session));
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
//Shouldn't be possible but just in case
|
||||||
|
session.transfer(original, REL_UNSUPPORTED_CONTENT);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (StringUtils.isNotEmpty(avroSchema.get())) {
|
||||||
|
|
||||||
|
String destination = context.getProperty(SCHEMA_DESTINATION).getValue();
|
||||||
|
FlowFile avroSchemaFF = null;
|
||||||
|
|
||||||
|
switch (destination) {
|
||||||
|
case DESTINATION_ATTRIBUTE:
|
||||||
|
avroSchemaFF = session.putAttribute(session.clone(original), AVRO_SCHEMA_ATTRIBUTE_NAME, avroSchema.get());
|
||||||
|
//Leaves the original CoreAttributes.MIME_TYPE in place.
|
||||||
|
break;
|
||||||
|
case DESTINATION_CONTENT:
|
||||||
|
avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(OutputStream out) throws IOException {
|
||||||
|
out.write(avroSchema.get().getBytes());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.MIME_TYPE.key(), AVRO_MIME_TYPE);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Transfer the sessions.
|
||||||
|
avroSchemaFF = session.putAttribute(avroSchemaFF, CoreAttributes.FILENAME.key(), (original.getAttribute(CoreAttributes.FILENAME.key()) + AVRO_FILE_EXTENSION));
|
||||||
|
session.transfer(avroSchemaFF, REL_SUCCESS);
|
||||||
|
session.transfer(original, REL_ORIGINAL);
|
||||||
|
} else {
|
||||||
|
//If the avroSchema is null then the content type is unknown and therefore unsupported
|
||||||
|
session.transfer(original, REL_UNSUPPORTED_CONTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (Exception ex) {
|
||||||
|
getLogger().error("Failed to infer Avro schema for {} due to {}", new Object[] {original, ex});
|
||||||
|
session.transfer(original, REL_FAILURE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Infers the Avro schema from the input Flowfile content. To infer an Avro schema for CSV content a header line is
|
||||||
|
* required. You can configure the processor to pull that header line from the first line of the CSV data if it is
|
||||||
|
* present OR you can manually supply the desired header line as a property value.
|
||||||
|
*
|
||||||
|
* @param inputFlowFile
|
||||||
|
* The original input FlowFile containing the CSV content as it entered this processor.
|
||||||
|
*
|
||||||
|
* @param context
|
||||||
|
* ProcessContext to pull processor configurations.
|
||||||
|
*
|
||||||
|
* @param session
|
||||||
|
* ProcessSession to transfer FlowFiles
|
||||||
|
*/
|
||||||
|
private String inferAvroSchemaFromCSV(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
|
||||||
|
|
||||||
|
//Determines the header line either from the property input or the first line of the delimited file.
|
||||||
|
final AtomicReference<String> header = new AtomicReference<>();
|
||||||
|
final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
|
||||||
|
|
||||||
|
if (context.getProperty(GET_CSV_HEADER_DEFINITION_FROM_INPUT).asBoolean() == Boolean.TRUE) {
|
||||||
|
//Read the first line of the file to get the header value.
|
||||||
|
session.read(inputFlowFile, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in) throws IOException {
|
||||||
|
BufferedReader br = new BufferedReader(new InputStreamReader(in));
|
||||||
|
header.set(br.readLine());
|
||||||
|
hasHeader.set(Boolean.TRUE);
|
||||||
|
br.close();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
hasHeader.set(Boolean.TRUE);
|
||||||
|
} else {
|
||||||
|
header.set(context.getProperty(CSV_HEADER_DEFINITION).evaluateAttributeExpressions(inputFlowFile).getValue());
|
||||||
|
hasHeader.set(Boolean.FALSE);
|
||||||
|
}
|
||||||
|
|
||||||
|
//Prepares the CSVProperties for kite
|
||||||
|
final CSVProperties props = new CSVProperties.Builder()
|
||||||
|
.delimiter(CSV_DELIMITER)
|
||||||
|
.escape(context.getProperty(ESCAPE_STRING).evaluateAttributeExpressions().getValue())
|
||||||
|
.quote(context.getProperty(QUOTE_STRING).evaluateAttributeExpressions().getValue())
|
||||||
|
.header(header.get())
|
||||||
|
.hasHeader(hasHeader.get())
|
||||||
|
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).evaluateAttributeExpressions().asInteger())
|
||||||
|
.charset(context.getProperty(CHARSET).getValue())
|
||||||
|
.build();
|
||||||
|
|
||||||
|
final AtomicReference<String> avroSchema = new AtomicReference<>();
|
||||||
|
|
||||||
|
session.read(inputFlowFile, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in) throws IOException {
|
||||||
|
avroSchema.set(CSVUtil
|
||||||
|
.inferSchema(
|
||||||
|
context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(), in, props)
|
||||||
|
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return avroSchema.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Infers the Avro schema from the input Flowfile content.
|
||||||
|
*
|
||||||
|
* @param inputFlowFile
|
||||||
|
* The original input FlowFile containing the JSON content as it entered this processor.
|
||||||
|
*
|
||||||
|
* @param context
|
||||||
|
* ProcessContext to pull processor configurations.
|
||||||
|
*
|
||||||
|
* @param session
|
||||||
|
* ProcessSession to transfer FlowFiles
|
||||||
|
*/
|
||||||
|
private String inferAvroSchemaFromJSON(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
|
||||||
|
|
||||||
|
final AtomicReference<String> avroSchema = new AtomicReference<>();
|
||||||
|
session.read(inputFlowFile, new InputStreamCallback() {
|
||||||
|
@Override
|
||||||
|
public void process(InputStream in) throws IOException {
|
||||||
|
Schema as = JsonUtil.inferSchema(
|
||||||
|
in, context.getProperty(RECORD_NAME).evaluateAttributeExpressions().getValue(),
|
||||||
|
context.getProperty(NUM_RECORDS_TO_ANALYZE).evaluateAttributeExpressions().asInteger());
|
||||||
|
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return avroSchema.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Examines the incoming FlowFiles mime.type attribute to determine if the schema should be inferred for CSV or JSON data.
|
||||||
|
*
|
||||||
|
* @param inputFlowFile
|
||||||
|
* The original input FlowFile containing the content.
|
||||||
|
*
|
||||||
|
* @param context
|
||||||
|
* ProcessContext to pull processor configurations.
|
||||||
|
*
|
||||||
|
* @param session
|
||||||
|
* ProcessSession to transfer FlowFiles
|
||||||
|
*/
|
||||||
|
private String inferAvroSchemaFromMimeType(final FlowFile inputFlowFile, final ProcessContext context, final ProcessSession session) {
|
||||||
|
|
||||||
|
String mimeType = inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key());
|
||||||
|
String avroSchema = "";
|
||||||
|
|
||||||
|
if (mimeType!= null) {
|
||||||
|
switch (mimeType) {
|
||||||
|
case JSON_MIME_TYPE:
|
||||||
|
getLogger().debug("Inferred content type as JSON from \"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
|
||||||
|
inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
|
||||||
|
avroSchema = inferAvroSchemaFromJSON(inputFlowFile, context, session);
|
||||||
|
break;
|
||||||
|
case CSV_MIME_TYPE:
|
||||||
|
getLogger().debug("Inferred content type as CSV from \"{}\" value of \"{}\"", new Object[]{CoreAttributes.MIME_TYPE.key(),
|
||||||
|
inputFlowFile.getAttribute(CoreAttributes.MIME_TYPE.key())});
|
||||||
|
avroSchema = inferAvroSchemaFromCSV(inputFlowFile, context, session);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
getLogger().warn("Unable to infer Avro Schema from {} because its mime type is {}, " +
|
||||||
|
" which is not supported by this Processor", new Object[] {inputFlowFile, mimeType} );
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return avroSchema;
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,235 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.kite;
|
|
||||||
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.BufferedReader;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
|
||||||
import org.apache.nifi.processor.Relationship;
|
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
import org.kitesdk.data.spi.filesystem.CSVProperties;
|
|
||||||
import org.kitesdk.data.spi.filesystem.CSVUtil;
|
|
||||||
import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
|
|
||||||
|
|
||||||
|
|
||||||
@Tags({"kite", "csv", "avro", "infer", "schema"})
|
|
||||||
@SeeAlso({InferAvroSchemaFromCSV.class})
|
|
||||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
|
||||||
@CapabilityDescription("Creates an Avro schema from a CSV file header. The header line definition can either be provided" +
|
|
||||||
"as a property to the processor OR present in the first line of CSV in the incoming FlowFile content. If a header" +
|
|
||||||
" property is specified for this processor no attempt will be made to use the header line that may be present" +
|
|
||||||
" in the incoming CSV FlowFile content.")
|
|
||||||
public class InferAvroSchemaFromCSV
|
|
||||||
extends AbstractKiteProcessor {
|
|
||||||
|
|
||||||
public static final String CSV_DELIMITER = ",";
|
|
||||||
|
|
||||||
public static final PropertyDescriptor HEADER_LINE = new PropertyDescriptor.Builder()
|
|
||||||
.name("CSV Header Line")
|
|
||||||
.description("Comma separated string defining the column names expected in the CSV data. " +
|
|
||||||
"EX: \"fname,lname,zip,address\"")
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor HEADER_LINE_SKIP_COUNT = new PropertyDescriptor.Builder()
|
|
||||||
.name("CSV Header Line Skip Count")
|
|
||||||
.description("Specifies the number of header lines that should be skipped when reading the CSV data. If the " +
|
|
||||||
" first line of the CSV data is a header line and you specify the \"CSV Header Line\" property " +
|
|
||||||
"you need to set this vlaue to 1 otherwise the header line will be treated as actual data.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("0")
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor ESCAPE_STRING = new PropertyDescriptor.Builder()
|
|
||||||
.name("CSV escape string")
|
|
||||||
.description("String that represents an escape sequence in the CSV FlowFile content data.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("\\")
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor QUOTE_STRING = new PropertyDescriptor.Builder()
|
|
||||||
.name("CSV quote string")
|
|
||||||
.description("String that represents a literal quote character in the CSV FlowFile content data.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("'")
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
|
|
||||||
.name("Avro Record Name")
|
|
||||||
.description("Value to be placed in the Avro record schema \"name\" field.")
|
|
||||||
.required(true)
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
|
||||||
.name("Charset")
|
|
||||||
.description("Character encoding of CSV data.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("UTF-8")
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
|
|
||||||
.name("Pretty Avro Output")
|
|
||||||
.description("If true the Avro output will be formatted.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("true")
|
|
||||||
.allowableValues("true", "false")
|
|
||||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
|
||||||
.description("Successfully created Avro schema for CSV data.").build();
|
|
||||||
|
|
||||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
|
|
||||||
.description("Original incoming FlowFile CSV data").build();
|
|
||||||
|
|
||||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
|
||||||
.description("Failed to create Avro schema for CSV data.").build();
|
|
||||||
|
|
||||||
private List<PropertyDescriptor> properties;
|
|
||||||
private Set<Relationship> relationships;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
|
||||||
properties.add(HEADER_LINE);
|
|
||||||
properties.add(HEADER_LINE_SKIP_COUNT);
|
|
||||||
properties.add(ESCAPE_STRING);
|
|
||||||
properties.add(QUOTE_STRING);
|
|
||||||
properties.add(PRETTY_AVRO_OUTPUT);
|
|
||||||
properties.add(RECORD_NAME);
|
|
||||||
properties.add(CHARSET);
|
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
|
||||||
|
|
||||||
final Set<Relationship> relationships = new HashSet<>();
|
|
||||||
relationships.add(REL_SUCCESS);
|
|
||||||
relationships.add(REL_FAILURE);
|
|
||||||
relationships.add(REL_ORIGINAL);
|
|
||||||
this.relationships = Collections.unmodifiableSet(relationships);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
|
||||||
return properties;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<Relationship> getRelationships() {
|
|
||||||
return relationships;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
|
|
||||||
final FlowFile original = session.get();
|
|
||||||
if (original == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
//Determines the header line either from the property input or the first line of the delimited file.
|
|
||||||
final AtomicReference<String> header = new AtomicReference<>();
|
|
||||||
final AtomicReference<Boolean> hasHeader = new AtomicReference<>();
|
|
||||||
|
|
||||||
if (context.getProperty(HEADER_LINE).isSet()) {
|
|
||||||
header.set(context.getProperty(HEADER_LINE).getValue());
|
|
||||||
hasHeader.set(Boolean.FALSE);
|
|
||||||
} else {
|
|
||||||
//Read the first line of the file to get the header value.
|
|
||||||
session.read(original, new InputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in) throws IOException {
|
|
||||||
BufferedReader br = new BufferedReader(new InputStreamReader(in));
|
|
||||||
header.set(br.readLine());
|
|
||||||
hasHeader.set(Boolean.TRUE);
|
|
||||||
br.close();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
//Prepares the CSVProperties for kite
|
|
||||||
final CSVProperties props = new CSVProperties.Builder()
|
|
||||||
.delimiter(CSV_DELIMITER)
|
|
||||||
.escape(context.getProperty(ESCAPE_STRING).getValue())
|
|
||||||
.quote(context.getProperty(QUOTE_STRING).getValue())
|
|
||||||
.header(header.get())
|
|
||||||
.hasHeader(hasHeader.get())
|
|
||||||
.linesToSkip(context.getProperty(HEADER_LINE_SKIP_COUNT).asInteger())
|
|
||||||
.charset(context.getProperty(CHARSET).getValue())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
final Set<String> required = ImmutableSet.of();
|
|
||||||
final AtomicReference<String> avroSchema = new AtomicReference<>();
|
|
||||||
|
|
||||||
session.read(original, new InputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in) throws IOException {
|
|
||||||
avroSchema.set(CSVUtil
|
|
||||||
.inferNullableSchema(
|
|
||||||
context.getProperty(RECORD_NAME).getValue(), in, props, required)
|
|
||||||
.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(OutputStream out) throws IOException {
|
|
||||||
out.write(avroSchema.get().getBytes());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
//Transfer the sessions.
|
|
||||||
session.transfer(original, REL_ORIGINAL);
|
|
||||||
session.transfer(avroSchemaFF, REL_SUCCESS);
|
|
||||||
|
|
||||||
} catch (Exception ex) {
|
|
||||||
getLogger().error(ex.getMessage());
|
|
||||||
session.transfer(original, REL_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,164 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.kite;
|
|
||||||
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.processor.ProcessContext;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
|
||||||
import org.apache.nifi.processor.Relationship;
|
|
||||||
import org.apache.nifi.processor.exception.ProcessException;
|
|
||||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
import org.kitesdk.data.spi.JsonUtil;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
@Tags({"kite", "json", "avro", "infer", "schema"})
|
|
||||||
@SeeAlso({InferAvroSchemaFromJSON.class})
|
|
||||||
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
|
|
||||||
@CapabilityDescription("Creates an Avro schema from JSON data. The Avro schema is inferred by examining the fields " +
|
|
||||||
"in the JSON input. The Avro schema generated by kite will use the same names present in the incoming JSON payload")
|
|
||||||
public class InferAvroSchemaFromJSON
|
|
||||||
extends AbstractKiteProcessor {
|
|
||||||
|
|
||||||
public static final PropertyDescriptor RECORD_NAME = new PropertyDescriptor.Builder()
|
|
||||||
.name("Avro Record Name")
|
|
||||||
.description("Value to be placed in the Avro record schema \"name\" field.")
|
|
||||||
.required(true)
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor NUM_RECORDS_TO_ANALYZE = new PropertyDescriptor.Builder()
|
|
||||||
.name("Number of records to analyze")
|
|
||||||
.description("Number of records that should be analyzed by kite to infer the Avro schema")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("10")
|
|
||||||
.addValidator(StandardValidators.INTEGER_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
|
|
||||||
.name("Charset")
|
|
||||||
.description("Character encoding of CSV data.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("UTF-8")
|
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
public static final PropertyDescriptor PRETTY_AVRO_OUTPUT = new PropertyDescriptor.Builder()
|
|
||||||
.name("Pretty Avro Output")
|
|
||||||
.description("If true the Avro output will be formatted.")
|
|
||||||
.required(true)
|
|
||||||
.defaultValue("true")
|
|
||||||
.allowableValues("true", "false")
|
|
||||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
|
||||||
.build();
|
|
||||||
|
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
|
|
||||||
.description("Successfully created Avro schema for JSON data.").build();
|
|
||||||
|
|
||||||
public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original")
|
|
||||||
.description("Original incoming FlowFile JSON data").build();
|
|
||||||
|
|
||||||
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
|
|
||||||
.description("Failed to create Avro schema for JSON data.").build();
|
|
||||||
|
|
||||||
private List<PropertyDescriptor> properties;
|
|
||||||
private Set<Relationship> relationships;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void init(final ProcessorInitializationContext context) {
|
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
|
||||||
properties.add(CHARSET);
|
|
||||||
properties.add(PRETTY_AVRO_OUTPUT);
|
|
||||||
properties.add(RECORD_NAME);
|
|
||||||
properties.add(NUM_RECORDS_TO_ANALYZE);
|
|
||||||
this.properties = Collections.unmodifiableList(properties);
|
|
||||||
|
|
||||||
final Set<Relationship> relationships = new HashSet<>();
|
|
||||||
relationships.add(REL_SUCCESS);
|
|
||||||
relationships.add(REL_FAILURE);
|
|
||||||
relationships.add(REL_ORIGINAL);
|
|
||||||
this.relationships = Collections.unmodifiableSet(relationships);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
|
||||||
return properties;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Set<Relationship> getRelationships() {
|
|
||||||
return relationships;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
|
|
||||||
final FlowFile original = session.get();
|
|
||||||
if (original == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
final AtomicReference<String> avroSchema = new AtomicReference<>();
|
|
||||||
session.read(original, new InputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in) throws IOException {
|
|
||||||
Schema as = JsonUtil.inferSchema(
|
|
||||||
in, context.getProperty(RECORD_NAME).getValue(), context.getProperty(NUM_RECORDS_TO_ANALYZE).asInteger());
|
|
||||||
avroSchema.set(as.toString(context.getProperty(PRETTY_AVRO_OUTPUT).asBoolean()));
|
|
||||||
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
FlowFile avroSchemaFF = session.write(session.create(), new OutputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(OutputStream out) throws IOException {
|
|
||||||
out.write(avroSchema.get().getBytes());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
//Transfer the FlowFiles
|
|
||||||
session.transfer(original, REL_ORIGINAL);
|
|
||||||
session.transfer(avroSchemaFF, REL_SUCCESS);
|
|
||||||
|
|
||||||
} catch (Exception ex) {
|
|
||||||
getLogger().error(ex.getMessage());
|
|
||||||
session.transfer(original, REL_FAILURE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -16,5 +16,4 @@ org.apache.nifi.processors.kite.StoreInKiteDataset
|
||||||
org.apache.nifi.processors.kite.ConvertCSVToAvro
|
org.apache.nifi.processors.kite.ConvertCSVToAvro
|
||||||
org.apache.nifi.processors.kite.ConvertJSONToAvro
|
org.apache.nifi.processors.kite.ConvertJSONToAvro
|
||||||
org.apache.nifi.processors.kite.ConvertAvroSchema
|
org.apache.nifi.processors.kite.ConvertAvroSchema
|
||||||
org.apache.nifi.processors.kite.InferAvroSchemaFromCSV
|
org.apache.nifi.processors.kite.InferAvroSchema
|
||||||
org.apache.nifi.processors.kite.InferAvroSchemaFromJSON
|
|
||||||
|
|
|
@ -0,0 +1,176 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.processors.kite;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
|
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||||
|
import org.apache.nifi.util.MockFlowFile;
|
||||||
|
import org.apache.nifi.util.TestRunner;
|
||||||
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.StringWriter;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.nio.file.StandardOpenOption;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class TestInferAvroSchema {
|
||||||
|
|
||||||
|
private TestRunner runner = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
runner = TestRunners.newTestRunner(InferAvroSchema.class);
|
||||||
|
|
||||||
|
//Prepare the common setup.
|
||||||
|
runner.assertNotValid();
|
||||||
|
|
||||||
|
runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
|
||||||
|
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
|
||||||
|
runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_CONTENT);
|
||||||
|
runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "0");
|
||||||
|
runner.setProperty(InferAvroSchema.ESCAPE_STRING, "\\");
|
||||||
|
runner.setProperty(InferAvroSchema.QUOTE_STRING, "'");
|
||||||
|
runner.setProperty(InferAvroSchema.RECORD_NAME, "com.jeremydyer.contact");
|
||||||
|
runner.setProperty(InferAvroSchema.CHARSET, "UTF-8");
|
||||||
|
runner.setProperty(InferAvroSchema.PRETTY_AVRO_OUTPUT, "true");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void inferAvroSchemaFromHeaderDefinitionOfCSVFile() throws Exception {
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
|
||||||
|
runner.enqueue(new File("src/test/resources/Shapes_Header.csv").toPath(), attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile flowFile = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
|
||||||
|
flowFile.assertContentEquals(new File("src/test/resources/Shapes_Header.csv.avro").toPath());
|
||||||
|
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void inferAvroSchemaFromJSONFile() throws Exception {
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.setProperty(InferAvroSchema.INPUT_CONTENT_TYPE, InferAvroSchema.USE_MIME_TYPE);
|
||||||
|
|
||||||
|
//Purposely set to True to test that none of the JSON file is read which would cause issues.
|
||||||
|
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "true");
|
||||||
|
runner.setProperty(InferAvroSchema.SCHEMA_DESTINATION, InferAvroSchema.DESTINATION_ATTRIBUTE);
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
|
||||||
|
runner.enqueue(new File("src/test/resources/Shapes.json").toPath(), attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
|
||||||
|
String avroSchema = data.getAttribute(InferAvroSchema.AVRO_SCHEMA_ATTRIBUTE_NAME);
|
||||||
|
String knownSchema = FileUtils.readFileToString(new File("src/test/resources/Shapes.json.avro"));
|
||||||
|
Assert.assertEquals(avroSchema, knownSchema);
|
||||||
|
|
||||||
|
//Since that avro schema is written to an attribute this should be teh same as the original
|
||||||
|
data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void inferAvroSchemaFromCSVFile() throws Exception {
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
//Read in the header
|
||||||
|
StringWriter writer = new StringWriter();
|
||||||
|
IOUtils.copy((Files.newInputStream(Paths.get("src/test/resources/ShapesHeader.csv"), StandardOpenOption.READ)), writer, "UTF-8");
|
||||||
|
runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, writer.toString());
|
||||||
|
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "false");
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
|
||||||
|
runner.enqueue(new File("src/test/resources/Shapes_NoHeader.csv").toPath(), attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_UNSUPPORTED_CONTENT, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
|
||||||
|
data.assertContentEquals(Paths.get("src/test/resources/Shapes_Header.csv.avro"));
|
||||||
|
data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception {
|
||||||
|
|
||||||
|
final String CSV_HEADER_LINE = FileUtils.readFileToString(new File("src/test/resources/ShapesHeader.csv"));
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
runner.setProperty(InferAvroSchema.GET_CSV_HEADER_DEFINITION_FROM_INPUT, "false");
|
||||||
|
runner.setProperty(InferAvroSchema.CSV_HEADER_DEFINITION, CSV_HEADER_LINE);
|
||||||
|
runner.setProperty(InferAvroSchema.HEADER_LINE_SKIP_COUNT, "1");
|
||||||
|
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
|
||||||
|
runner.enqueue((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes(), attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 1);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 1);
|
||||||
|
|
||||||
|
MockFlowFile data = runner.getFlowFilesForRelationship(InferAvroSchema.REL_SUCCESS).get(0);
|
||||||
|
data.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/avro-binary");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void inferSchemaFromEmptyContent() throws Exception {
|
||||||
|
runner.assertValid();
|
||||||
|
|
||||||
|
Map<String, String> attributes = new HashMap<>();
|
||||||
|
attributes.put(CoreAttributes.MIME_TYPE.key(), "text/csv");
|
||||||
|
runner.enqueue("", attributes);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_FAILURE, 1);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_ORIGINAL, 0);
|
||||||
|
runner.assertTransferCount(InferAvroSchema.REL_SUCCESS, 0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,129 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.kite;
|
|
||||||
;
|
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
|
||||||
import org.apache.nifi.processor.io.StreamCallback;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
|
||||||
import org.apache.nifi.util.TestRunners;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.OutputStream;
|
|
||||||
|
|
||||||
public class TestInferAvroSchemaFromCSV {
|
|
||||||
|
|
||||||
private final String CSV_HEADER_LINE = "fname,lname,age,zip";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void inferSchemaFromHeaderLineOfCSV() throws Exception {
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
|
|
||||||
|
|
||||||
runner.assertNotValid();
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "0");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
|
|
||||||
|
|
||||||
runner.assertValid();
|
|
||||||
|
|
||||||
ProcessSession session = runner.getProcessSessionFactory().createSession();
|
|
||||||
FlowFile ff = session.write(session.create(), new OutputStreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(OutputStream out) throws IOException {
|
|
||||||
out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
//Enqueue the empty FlowFile
|
|
||||||
runner.enqueue(ff);
|
|
||||||
runner.run();
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void inferSchemaFormHeaderLinePropertyOfProcessor() throws Exception {
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
|
|
||||||
|
|
||||||
runner.assertNotValid();
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE);
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
|
|
||||||
|
|
||||||
runner.assertValid();
|
|
||||||
|
|
||||||
ProcessSession session = runner.getProcessSessionFactory().createSession();
|
|
||||||
FlowFile ff = session.write(session.create(), new StreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in, OutputStream out) throws IOException {
|
|
||||||
out.write((CSV_HEADER_LINE + "\nJeremy,Dyer,29,55555").getBytes());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
//Enqueue the empty FlowFile
|
|
||||||
runner.enqueue(ff);
|
|
||||||
runner.run();
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 0);
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 1);
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void inferSchemaFromEmptyContent() throws Exception {
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(InferAvroSchemaFromCSV.class);
|
|
||||||
|
|
||||||
runner.assertNotValid();
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE, CSV_HEADER_LINE);
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.HEADER_LINE_SKIP_COUNT, "1");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.ESCAPE_STRING, "\\");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.QUOTE_STRING, "'");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.RECORD_NAME, "contact");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.CHARSET, "UTF-8");
|
|
||||||
runner.setProperty(InferAvroSchemaFromCSV.PRETTY_AVRO_OUTPUT, "true");
|
|
||||||
|
|
||||||
runner.assertValid();
|
|
||||||
|
|
||||||
ProcessSession session = runner.getProcessSessionFactory().createSession();
|
|
||||||
FlowFile ff = session.write(session.create(), new StreamCallback() {
|
|
||||||
@Override
|
|
||||||
public void process(InputStream in, OutputStream out) throws IOException {
|
|
||||||
out.write("".getBytes());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
//Enqueue the empty FlowFile
|
|
||||||
runner.enqueue(ff);
|
|
||||||
runner.run();
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_FAILURE, 1);
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_ORIGINAL, 0);
|
|
||||||
runner.assertTransferCount(InferAvroSchemaFromCSV.REL_SUCCESS, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,215 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.processors.kite;
|
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import org.apache.avro.Schema;
|
|
||||||
import org.apache.avro.SchemaBuilder;
|
|
||||||
import org.apache.avro.file.DataFileStream;
|
|
||||||
import org.apache.avro.generic.GenericData;
|
|
||||||
import org.apache.avro.generic.GenericDatumReader;
|
|
||||||
import org.apache.nifi.util.MockFlowFile;
|
|
||||||
import org.apache.nifi.util.TestRunner;
|
|
||||||
import org.apache.nifi.util.TestRunners;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
|
|
||||||
|
|
||||||
public class TestInferAvroSchemaFromJSON {
|
|
||||||
|
|
||||||
public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
|
|
||||||
.fields().requiredString("id").requiredString("primaryColor")
|
|
||||||
.optionalString("secondaryColor").optionalString("price")
|
|
||||||
.endRecord();
|
|
||||||
|
|
||||||
public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
|
|
||||||
.fields().requiredLong("id").requiredString("color")
|
|
||||||
.optionalDouble("price").endRecord();
|
|
||||||
|
|
||||||
public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
|
|
||||||
|
|
||||||
public static final String FAILURE_SUMMARY = "Cannot convert free to double";
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBasicConversion() throws IOException {
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
|
|
||||||
runner.assertNotValid();
|
|
||||||
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
|
|
||||||
INPUT_SCHEMA.toString());
|
|
||||||
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
|
|
||||||
OUTPUT_SCHEMA.toString());
|
|
||||||
runner.setProperty("primaryColor", "color");
|
|
||||||
runner.assertValid();
|
|
||||||
|
|
||||||
// Two valid rows, and one invalid because "free" is not a double.
|
|
||||||
GenericData.Record goodRecord1 = dataBasic("1", "blue", null, null);
|
|
||||||
GenericData.Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
|
|
||||||
GenericData.Record badRecord = dataBasic("3", "red", "yellow", "free");
|
|
||||||
List<GenericData.Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
|
|
||||||
badRecord);
|
|
||||||
|
|
||||||
runner.enqueue(streamFor(input));
|
|
||||||
runner.run();
|
|
||||||
|
|
||||||
long converted = runner.getCounterValue("Converted records");
|
|
||||||
long errors = runner.getCounterValue("Conversion errors");
|
|
||||||
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
|
||||||
Assert.assertEquals("Should reject 1 rows", 1, errors);
|
|
||||||
|
|
||||||
runner.assertTransferCount("success", 1);
|
|
||||||
runner.assertTransferCount("failure", 1);
|
|
||||||
|
|
||||||
MockFlowFile incompatible = runner.getFlowFilesForRelationship(
|
|
||||||
"failure").get(0);
|
|
||||||
GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<GenericData.Record>(
|
|
||||||
INPUT_SCHEMA);
|
|
||||||
DataFileStream<GenericData.Record> stream = new DataFileStream<GenericData.Record>(
|
|
||||||
new ByteArrayInputStream(
|
|
||||||
runner.getContentAsByteArray(incompatible)), reader);
|
|
||||||
int count = 0;
|
|
||||||
for (GenericData.Record r : stream) {
|
|
||||||
Assert.assertEquals(badRecord, r);
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
stream.close();
|
|
||||||
Assert.assertEquals(1, count);
|
|
||||||
Assert.assertEquals("Should accumulate error messages",
|
|
||||||
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
|
|
||||||
|
|
||||||
GenericDatumReader<GenericData.Record> successReader = new GenericDatumReader<GenericData.Record>(
|
|
||||||
OUTPUT_SCHEMA);
|
|
||||||
DataFileStream<GenericData.Record> successStream = new DataFileStream<GenericData.Record>(
|
|
||||||
new ByteArrayInputStream(runner.getContentAsByteArray(runner
|
|
||||||
.getFlowFilesForRelationship("success").get(0))),
|
|
||||||
successReader);
|
|
||||||
count = 0;
|
|
||||||
for (GenericData.Record r : successStream) {
|
|
||||||
if (count == 0) {
|
|
||||||
Assert.assertEquals(convertBasic(goodRecord1), r);
|
|
||||||
} else {
|
|
||||||
Assert.assertEquals(convertBasic(goodRecord2), r);
|
|
||||||
}
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
successStream.close();
|
|
||||||
Assert.assertEquals(2, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testNestedConversion() throws IOException {
|
|
||||||
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
|
|
||||||
runner.assertNotValid();
|
|
||||||
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
|
|
||||||
TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
|
|
||||||
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
|
|
||||||
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
|
|
||||||
runner.setProperty("parent.id", "parentId");
|
|
||||||
runner.assertValid();
|
|
||||||
|
|
||||||
// Two valid rows
|
|
||||||
GenericData.Record goodRecord1 = dataNested(1L, "200", null, null);
|
|
||||||
GenericData.Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
|
|
||||||
List<GenericData.Record> input = Lists.newArrayList(goodRecord1, goodRecord2);
|
|
||||||
|
|
||||||
runner.enqueue(streamFor(input));
|
|
||||||
runner.run();
|
|
||||||
|
|
||||||
long converted = runner.getCounterValue("Converted records");
|
|
||||||
long errors = runner.getCounterValue("Conversion errors");
|
|
||||||
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
|
||||||
Assert.assertEquals("Should reject 0 rows", 0, errors);
|
|
||||||
|
|
||||||
runner.assertTransferCount("success", 1);
|
|
||||||
runner.assertTransferCount("failure", 0);
|
|
||||||
|
|
||||||
GenericDatumReader<GenericData.Record> successReader = new GenericDatumReader<GenericData.Record>(
|
|
||||||
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
|
|
||||||
DataFileStream<GenericData.Record> successStream = new DataFileStream<GenericData.Record>(
|
|
||||||
new ByteArrayInputStream(runner.getContentAsByteArray(runner
|
|
||||||
.getFlowFilesForRelationship("success").get(0))),
|
|
||||||
successReader);
|
|
||||||
int count = 0;
|
|
||||||
for (GenericData.Record r : successStream) {
|
|
||||||
if (count == 0) {
|
|
||||||
Assert.assertEquals(convertNested(goodRecord1), r);
|
|
||||||
} else {
|
|
||||||
Assert.assertEquals(convertNested(goodRecord2), r);
|
|
||||||
}
|
|
||||||
count++;
|
|
||||||
}
|
|
||||||
successStream.close();
|
|
||||||
Assert.assertEquals(2, count);
|
|
||||||
}
|
|
||||||
|
|
||||||
private GenericData.Record convertBasic(GenericData.Record inputRecord) {
|
|
||||||
GenericData.Record result = new GenericData.Record(OUTPUT_SCHEMA);
|
|
||||||
result.put("id", Long.parseLong(inputRecord.get("id").toString()));
|
|
||||||
result.put("color", inputRecord.get("primaryColor").toString());
|
|
||||||
if (inputRecord.get("price") == null) {
|
|
||||||
result.put("price", null);
|
|
||||||
} else {
|
|
||||||
result.put("price",
|
|
||||||
Double.parseDouble(inputRecord.get("price").toString()));
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private GenericData.Record dataBasic(String id, String primaryColor,
|
|
||||||
String secondaryColor, String price) {
|
|
||||||
GenericData.Record result = new GenericData.Record(INPUT_SCHEMA);
|
|
||||||
result.put("id", id);
|
|
||||||
result.put("primaryColor", primaryColor);
|
|
||||||
result.put("secondaryColor", secondaryColor);
|
|
||||||
result.put("price", price);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private GenericData.Record convertNested(GenericData.Record inputRecord) {
|
|
||||||
GenericData.Record result = new GenericData.Record(
|
|
||||||
TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
|
|
||||||
result.put("l1", inputRecord.get("l1"));
|
|
||||||
result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
|
|
||||||
if (inputRecord.get("parent") != null) {
|
|
||||||
// output schema doesn't have parent name.
|
|
||||||
result.put("parentId",
|
|
||||||
((GenericData.Record) inputRecord.get("parent")).get("id"));
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
private GenericData.Record dataNested(long id, String companyName, Long parentId,
|
|
||||||
String parentName) {
|
|
||||||
GenericData.Record result = new GenericData.Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
|
|
||||||
result.put("l1", id);
|
|
||||||
result.put("s1", companyName);
|
|
||||||
if (parentId != null || parentName != null) {
|
|
||||||
GenericData.Record parent = new GenericData.Record(
|
|
||||||
TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
|
|
||||||
parent.put("id", parentId);
|
|
||||||
parent.put("name", parentName);
|
|
||||||
result.put("parent", parent);
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
{
|
||||||
|
"shapes":
|
||||||
|
[
|
||||||
|
{"shape": "circle", "color": "red", "width": 100, "height": 100},
|
||||||
|
{"shape": "square", "color": "red", "width": 100, "height": 100},
|
||||||
|
{"shape": "sphere", "color": "red", "width": 100, "height": 100},
|
||||||
|
{"shape": "triangle", "color": "red", "width": 100, "height": 100},
|
||||||
|
{"shape": "rectangle", "color": "red", "width": 100, "height": 100}
|
||||||
|
]
|
||||||
|
}
|
|
@ -0,0 +1,34 @@
|
||||||
|
{
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "contact",
|
||||||
|
"namespace" : "com.jeremydyer",
|
||||||
|
"fields" : [ {
|
||||||
|
"name" : "shapes",
|
||||||
|
"type" : {
|
||||||
|
"type" : "array",
|
||||||
|
"items" : {
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "shapes",
|
||||||
|
"namespace" : "",
|
||||||
|
"fields" : [ {
|
||||||
|
"name" : "shape",
|
||||||
|
"type" : "string",
|
||||||
|
"doc" : "Type inferred from '\"circle\"'"
|
||||||
|
}, {
|
||||||
|
"name" : "color",
|
||||||
|
"type" : "string",
|
||||||
|
"doc" : "Type inferred from '\"red\"'"
|
||||||
|
}, {
|
||||||
|
"name" : "width",
|
||||||
|
"type" : "int",
|
||||||
|
"doc" : "Type inferred from '100'"
|
||||||
|
}, {
|
||||||
|
"name" : "height",
|
||||||
|
"type" : "int",
|
||||||
|
"doc" : "Type inferred from '100'"
|
||||||
|
} ]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"doc" : "Type inferred from '[{\"shape\":\"circle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"square\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"sphere\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"triangle\",\"color\":\"red\",\"width\":100,\"height\":100},{\"shape\":\"rectangle\",\"color\":\"red\",\"width\":100,\"height\":100}]'"
|
||||||
|
} ]
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
shape,color,width,height
|
|
|
@ -0,0 +1,352 @@
|
||||||
|
shape,color,width,height
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
|
|
@ -0,0 +1,351 @@
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
square,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
rectangle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
sphere,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
triangle,red,100,100
|
||||||
|
cone,red,100,100
|
||||||
|
circle,red,100,100
|
||||||
|
rectangle,red,100,100
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
{
|
||||||
|
"type" : "record",
|
||||||
|
"name" : "contact",
|
||||||
|
"namespace" : "com.jeremydyer",
|
||||||
|
"doc" : "Schema generated by Kite",
|
||||||
|
"fields" : [ {
|
||||||
|
"name" : "shape",
|
||||||
|
"type" : "string",
|
||||||
|
"doc" : "Type inferred from 'circle'"
|
||||||
|
}, {
|
||||||
|
"name" : "color",
|
||||||
|
"type" : "string",
|
||||||
|
"doc" : "Type inferred from 'red'"
|
||||||
|
}, {
|
||||||
|
"name" : "width",
|
||||||
|
"type" : "long",
|
||||||
|
"doc" : "Type inferred from '100'"
|
||||||
|
}, {
|
||||||
|
"name" : "height",
|
||||||
|
"type" : "long",
|
||||||
|
"doc" : "Type inferred from '100'"
|
||||||
|
} ]
|
||||||
|
}
|
Loading…
Reference in New Issue