mirror of https://github.com/apache/nifi.git
NIFI-238: Add Kite processors.
Includes: * KiteStorageProcessor - store Avro files in a Kite dataset * KiteCSVToAvroProcessor - convert CSV to Avro for storage * KiteJSONToAvroProcessor - convert JSON to Avro for storage
This commit is contained in:
parent
b8ade5b129
commit
8242417388
|
@ -0,0 +1,35 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kite-bundle</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-kite-nar</artifactId>
|
||||
<packaging>nar</packaging>
|
||||
|
||||
<name>Kite NAR</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kite-processors</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,173 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kite-bundle</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-kite-processors</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Kite Hadoop Processors</name>
|
||||
|
||||
<properties>
|
||||
<kite.version>0.18.0</kite.version>
|
||||
<guava.version>11.0.2</guava.version>
|
||||
<junit.version>4.10</junit.version>
|
||||
<findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<!-- NiFi -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-flowfile-packager</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Kite -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-data-core</artifactId>
|
||||
<version>${kite.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<!-- Use findbugs-annotations instead -->
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-data-hive</artifactId>
|
||||
<version>${kite.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<!-- Use findbugs-annotations instead -->
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-data-hbase</artifactId>
|
||||
<version>${kite.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<!-- Use findbugs-annotations instead -->
|
||||
<groupId>com.google.code.findbugs</groupId>
|
||||
<artifactId>jsr305</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-hadoop-dependencies</artifactId>
|
||||
<type>pom</type>
|
||||
<version>${kite.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-hbase-dependencies</artifactId>
|
||||
<type>pom</type>
|
||||
<scope>provided</scope>
|
||||
<optional>true</optional>
|
||||
<version>${kite.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${guava.version}</version>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<!-- avoid warnings by bundling annotations -->
|
||||
<groupId>com.github.stephenc.findbugs</groupId>
|
||||
<artifactId>findbugs-annotations</artifactId>
|
||||
<scope>compile</scope>
|
||||
<version>${findbugs-annotations.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Test dependencies -->
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
<version>${junit.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-minicluster</artifactId>
|
||||
<version>${kite.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.sun.jersey</groupId>
|
||||
<artifactId>jersey-servlet</artifactId>
|
||||
<version>1.14</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-data-core</artifactId>
|
||||
<version>${kite.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.kitesdk</groupId>
|
||||
<artifactId>kite-hadoop-test-dependencies</artifactId>
|
||||
<type>pom</type>
|
||||
<scope>test</scope>
|
||||
<version>${kite.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,217 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.google.common.base.Splitter;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.io.Resources;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.kitesdk.data.DatasetNotFoundException;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.SchemaNotFoundException;
|
||||
import org.kitesdk.data.URIBuilder;
|
||||
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||
|
||||
abstract class AbstractKiteProcessor extends AbstractProcessor {
|
||||
|
||||
private static final Splitter COMMA = Splitter.on(',').trimResults();
|
||||
protected static final Validator FILES_EXIST = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String configFiles,
|
||||
ValidationContext context) {
|
||||
if (configFiles != null && !configFiles.isEmpty()) {
|
||||
for (String file : COMMA.split(configFiles)) {
|
||||
ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
|
||||
.validate(subject, file, context);
|
||||
if (!result.isValid()) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new ValidationResult.Builder()
|
||||
.subject(subject)
|
||||
.input(configFiles)
|
||||
.explanation("Files exist")
|
||||
.valid(true)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
|
||||
protected static final PropertyDescriptor CONF_XML_FILES =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Hadoop configuration files")
|
||||
.description("A comma-separated list of Hadoop configuration files")
|
||||
.addValidator(FILES_EXIST)
|
||||
.build();
|
||||
|
||||
protected static final Validator RECOGNIZED_URI = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String uri,
|
||||
ValidationContext context) {
|
||||
String message = "not set";
|
||||
boolean isValid = true;
|
||||
if (uri == null || uri.isEmpty()) {
|
||||
isValid = false;
|
||||
} else {
|
||||
try {
|
||||
new URIBuilder(URI.create(uri)).build();
|
||||
} catch (RuntimeException e) {
|
||||
message = e.getMessage();
|
||||
isValid = false;
|
||||
}
|
||||
}
|
||||
return new ValidationResult.Builder()
|
||||
.subject(subject)
|
||||
.input(uri)
|
||||
.explanation("Dataset URI is invalid: " + message)
|
||||
.valid(isValid)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Resolves a {@link Schema} for the given string, either a URI or a JSON
|
||||
* literal.
|
||||
*/
|
||||
protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
|
||||
URI uri;
|
||||
try {
|
||||
uri = new URI(uriOrLiteral);
|
||||
} catch (URISyntaxException e) {
|
||||
// try to parse the schema as a literal
|
||||
return parseSchema(uriOrLiteral);
|
||||
}
|
||||
|
||||
try {
|
||||
if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
|
||||
return Datasets.load(uri).getDataset().getDescriptor().getSchema();
|
||||
} else if ("resource".equals(uri.getScheme())) {
|
||||
InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
|
||||
.openStream();
|
||||
return parseSchema(uri, in);
|
||||
} else {
|
||||
// try to open the file
|
||||
Path schemaPath = new Path(uri);
|
||||
FileSystem fs = schemaPath.getFileSystem(conf);
|
||||
return parseSchema(uri, fs.open(schemaPath));
|
||||
}
|
||||
|
||||
} catch (DatasetNotFoundException e) {
|
||||
throw new SchemaNotFoundException(
|
||||
"Cannot read schema of missing dataset: " + uri, e);
|
||||
} catch (IOException e) {
|
||||
throw new SchemaNotFoundException(
|
||||
"Failed while reading " + uri + ": " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Schema parseSchema(String literal) {
|
||||
try {
|
||||
return new Schema.Parser().parse(literal);
|
||||
} catch (RuntimeException e) {
|
||||
throw new SchemaNotFoundException(
|
||||
"Failed to parse schema: " + literal, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static Schema parseSchema(URI uri, InputStream in) throws IOException {
|
||||
try {
|
||||
return new Schema.Parser().parse(in);
|
||||
} catch (RuntimeException e) {
|
||||
throw new SchemaNotFoundException("Failed to parse schema at " + uri, e);
|
||||
}
|
||||
}
|
||||
|
||||
protected static final Validator SCHEMA_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String uri, ValidationContext context) {
|
||||
Configuration conf = getConfiguration(
|
||||
context.getProperty(CONF_XML_FILES).getValue());
|
||||
|
||||
String error = null;
|
||||
try {
|
||||
getSchema(uri, conf);
|
||||
} catch (SchemaNotFoundException e) {
|
||||
error = e.getMessage();
|
||||
}
|
||||
return new ValidationResult.Builder()
|
||||
.subject(subject)
|
||||
.input(uri)
|
||||
.explanation(error)
|
||||
.valid(error == null)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
|
||||
protected static final List<PropertyDescriptor> ABSTRACT_KITE_PROPS =
|
||||
ImmutableList.<PropertyDescriptor>builder()
|
||||
.add(CONF_XML_FILES)
|
||||
.build();
|
||||
|
||||
static List<PropertyDescriptor> getProperties() {
|
||||
return ABSTRACT_KITE_PROPS;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
protected void setDefaultConfiguration(ProcessContext context)
|
||||
throws IOException {
|
||||
DefaultConfiguration.set(getConfiguration(
|
||||
context.getProperty(CONF_XML_FILES).getValue()));
|
||||
}
|
||||
|
||||
protected static Configuration getConfiguration(String configFiles) {
|
||||
Configuration conf = DefaultConfiguration.get();
|
||||
|
||||
if (configFiles == null || configFiles.isEmpty()) {
|
||||
return conf;
|
||||
}
|
||||
|
||||
for (String file : COMMA.split(configFiles)) {
|
||||
// process each resource only once
|
||||
if (conf.getResource(file) == null) {
|
||||
// use Path instead of String to get the file from the FS
|
||||
conf.addResource(new Path(file));
|
||||
}
|
||||
}
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return ABSTRACT_KITE_PROPS;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.avro.io.DatumReader;
|
||||
import org.apache.avro.io.DatumWriter;
|
||||
|
||||
import static org.apache.avro.generic.GenericData.StringType;
|
||||
|
||||
class AvroUtil {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> dClass) {
|
||||
return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> dClass) {
|
||||
return (DatumReader<D>) GenericData.get().createDatumReader(schema);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.CodecFactory;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.Validator;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.kitesdk.data.DatasetException;
|
||||
import org.kitesdk.data.DatasetIOException;
|
||||
import org.kitesdk.data.DatasetRecordException;
|
||||
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||
import org.kitesdk.data.spi.filesystem.CSVFileReaderFixed;
|
||||
import org.kitesdk.data.spi.filesystem.CSVProperties;
|
||||
|
||||
import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
|
||||
|
||||
@Tags({"kite", "csv", "avro"})
|
||||
@CapabilityDescription(
|
||||
"Converts CSV files to Avro according to an Avro Schema")
|
||||
public class ConvertCSVToAvro extends AbstractKiteProcessor {
|
||||
private static CSVProperties DEFAULTS = new CSVProperties.Builder().build();
|
||||
|
||||
private static Validator CHAR_VALIDATOR = new Validator() {
|
||||
@Override
|
||||
public ValidationResult validate(String subject, String input,
|
||||
ValidationContext context) {
|
||||
return new ValidationResult.Builder()
|
||||
.subject(subject)
|
||||
.input(input)
|
||||
.explanation("Only single characters are supported")
|
||||
.valid(input.length() == 1)
|
||||
.build();
|
||||
}
|
||||
};
|
||||
|
||||
private static Relationship SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFile content has been successfully saved")
|
||||
.build();
|
||||
|
||||
private static Relationship FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFile content could not be processed")
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor SCHEMA =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Record schema")
|
||||
.description(
|
||||
"Outgoing Avro schema for each record created from a CSV row")
|
||||
.addValidator(SCHEMA_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor CHARSET =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("CSV charset")
|
||||
.description("Character set for CSV files")
|
||||
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
|
||||
.defaultValue(DEFAULTS.charset)
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor DELIMITER =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("CSV delimiter")
|
||||
.description("Delimiter character for CSV records")
|
||||
.addValidator(CHAR_VALIDATOR)
|
||||
.defaultValue(DEFAULTS.delimiter)
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor QUOTE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("CSV quote character")
|
||||
.description("Quote character for CSV values")
|
||||
.addValidator(CHAR_VALIDATOR)
|
||||
.defaultValue(DEFAULTS.quote)
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor ESCAPE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("CSV escape character")
|
||||
.description("Escape character for CSV values")
|
||||
.addValidator(CHAR_VALIDATOR)
|
||||
.defaultValue(DEFAULTS.escape)
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor HAS_HEADER =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Use CSV header line")
|
||||
.description("Whether to use the first line as a header")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.defaultValue(String.valueOf(DEFAULTS.useHeader))
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor LINES_TO_SKIP =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Lines to skip")
|
||||
.description("Number of lines to skip before reading header or data")
|
||||
.addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
|
||||
.defaultValue(String.valueOf(DEFAULTS.linesToSkip))
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES =
|
||||
ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(AbstractKiteProcessor.getProperties())
|
||||
.add(SCHEMA)
|
||||
.add(CHARSET)
|
||||
.add(DELIMITER)
|
||||
.add(QUOTE)
|
||||
.add(ESCAPE)
|
||||
.add(HAS_HEADER)
|
||||
.add(LINES_TO_SKIP)
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS =
|
||||
ImmutableSet.<Relationship>builder()
|
||||
.add(SUCCESS)
|
||||
.add(FAILURE)
|
||||
.build();
|
||||
|
||||
// Immutable configuration
|
||||
@VisibleForTesting
|
||||
volatile CSVProperties props;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void createCSVProperties(ProcessContext context) throws IOException {
|
||||
super.setDefaultConfiguration(context);
|
||||
|
||||
this.props = new CSVProperties.Builder()
|
||||
.charset(context.getProperty(CHARSET).getValue())
|
||||
.delimiter(context.getProperty(DELIMITER).getValue())
|
||||
.quote(context.getProperty(QUOTE).getValue())
|
||||
.escape(context.getProperty(ESCAPE).getValue())
|
||||
.hasHeader(context.getProperty(HAS_HEADER).asBoolean())
|
||||
.linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, final ProcessSession session)
|
||||
throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Schema schema = getSchema(
|
||||
context.getProperty(SCHEMA).getValue(),
|
||||
DefaultConfiguration.get());
|
||||
|
||||
final DataFileWriter<Record> writer = new DataFileWriter<>(
|
||||
AvroUtil.newDatumWriter(schema, Record.class));
|
||||
writer.setCodec(CodecFactory.snappyCodec());
|
||||
|
||||
try {
|
||||
flowFile = session.write(flowFile, new StreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
long written = 0L;
|
||||
long errors = 0L;
|
||||
try (CSVFileReaderFixed<Record> reader = new CSVFileReaderFixed<>(
|
||||
in, props, schema, Record.class)) {
|
||||
reader.initialize();
|
||||
try (DataFileWriter<Record> w = writer.create(schema, out)) {
|
||||
while (reader.hasNext()) {
|
||||
try {
|
||||
Record record = reader.next();
|
||||
w.append(record);
|
||||
written += 1;
|
||||
} catch (DatasetRecordException e) {
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
session.adjustCounter("Converted records", written,
|
||||
false /* update only if file transfer is successful */);
|
||||
session.adjustCounter("Conversion errors", errors,
|
||||
false /* update only if file transfer is successful */);
|
||||
}
|
||||
});
|
||||
|
||||
session.transfer(flowFile, SUCCESS);
|
||||
|
||||
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
|
||||
} catch (ProcessException | DatasetIOException e) {
|
||||
getLogger().error("Failed reading or writing", e);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
|
||||
} catch (DatasetException e) {
|
||||
getLogger().error("Failed to read FlowFile", e);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
|
||||
} catch (Throwable t) {
|
||||
getLogger().error("Unknown Throwable", t);
|
||||
session.rollback(true); // penalize just in case
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,157 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.CodecFactory;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.FlowFileAccessException;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.StreamCallback;
|
||||
import org.kitesdk.data.DatasetException;
|
||||
import org.kitesdk.data.DatasetIOException;
|
||||
import org.kitesdk.data.DatasetRecordException;
|
||||
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||
|
||||
@Tags({"kite", "json", "avro"})
|
||||
@CapabilityDescription(
|
||||
"Converts JSON files to Avro according to an Avro Schema")
|
||||
public class ConvertJSONToAvro extends AbstractKiteProcessor {
|
||||
|
||||
private static Relationship SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFile content has been successfully saved")
|
||||
.build();
|
||||
|
||||
private static Relationship FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFile content could not be processed")
|
||||
.build();
|
||||
|
||||
@VisibleForTesting
|
||||
static final PropertyDescriptor SCHEMA =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Record schema")
|
||||
.description(
|
||||
"Outgoing Avro schema for each record created from a JSON object")
|
||||
.addValidator(SCHEMA_VALIDATOR)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES =
|
||||
ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(AbstractKiteProcessor.getProperties())
|
||||
.add(SCHEMA)
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS =
|
||||
ImmutableSet.<Relationship>builder()
|
||||
.add(SUCCESS)
|
||||
.add(FAILURE)
|
||||
.build();
|
||||
|
||||
public ConvertJSONToAvro() {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, final ProcessSession session)
|
||||
throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Schema schema = getSchema(
|
||||
context.getProperty(SCHEMA).getValue(),
|
||||
DefaultConfiguration.get());
|
||||
|
||||
final DataFileWriter<Record> writer = new DataFileWriter<>(
|
||||
AvroUtil.newDatumWriter(schema, Record.class));
|
||||
writer.setCodec(CodecFactory.snappyCodec());
|
||||
|
||||
try {
|
||||
flowFile = session.write(flowFile, new StreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in, OutputStream out) throws IOException {
|
||||
long written = 0L;
|
||||
long errors = 0L;
|
||||
try (JSONFileReader<Record> reader = new JSONFileReader<>(
|
||||
in, schema, Record.class)) {
|
||||
reader.initialize();
|
||||
try (DataFileWriter<Record> w = writer.create(schema, out)) {
|
||||
while (reader.hasNext()) {
|
||||
try {
|
||||
Record record = reader.next();
|
||||
w.append(record);
|
||||
written += 1;
|
||||
} catch (DatasetRecordException e) {
|
||||
errors += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
session.adjustCounter("Converted records", written,
|
||||
false /* update only if file transfer is successful */);
|
||||
session.adjustCounter("Conversion errors", errors,
|
||||
false /* update only if file transfer is successful */);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
session.transfer(flowFile, SUCCESS);
|
||||
|
||||
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
|
||||
} catch (ProcessException | DatasetIOException e) {
|
||||
getLogger().error("Failed reading or writing", e);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
|
||||
} catch (DatasetException e) {
|
||||
getLogger().error("Failed to read FlowFile", e);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterators;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Iterator;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericData;
|
||||
import org.kitesdk.data.DatasetIOException;
|
||||
import org.kitesdk.data.spi.AbstractDatasetReader;
|
||||
import org.kitesdk.data.spi.DataModelUtil;
|
||||
import org.kitesdk.data.spi.JsonUtil;
|
||||
import org.kitesdk.data.spi.ReaderWriterState;
|
||||
|
||||
/**
|
||||
* This is a temporary addition. The version in 0.18.0 throws a NPE when the
|
||||
* InputStream constructor is used.
|
||||
*/
|
||||
class JSONFileReader<E> extends AbstractDatasetReader<E> {
|
||||
|
||||
private final GenericData model;
|
||||
private final Schema schema;
|
||||
|
||||
private InputStream incoming = null;
|
||||
|
||||
// state
|
||||
private ReaderWriterState state = ReaderWriterState.NEW;
|
||||
private Iterator<E> iterator;
|
||||
|
||||
public JSONFileReader(InputStream incoming, Schema schema, Class<E> type) {
|
||||
this.incoming = incoming;
|
||||
this.schema = schema;
|
||||
this.model = DataModelUtil.getDataModelForType(type);
|
||||
this.state = ReaderWriterState.NEW;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize() {
|
||||
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
|
||||
"A reader may not be opened more than once - current state:%s", state);
|
||||
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
|
||||
"Schemas for JSON files should be record");
|
||||
|
||||
this.iterator = Iterators.transform(JsonUtil.parser(incoming),
|
||||
new Function<JsonNode, E>() {
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public E apply(@Nullable JsonNode node) {
|
||||
return (E) JsonUtil.convertToAvro(model, node, schema);
|
||||
}
|
||||
});
|
||||
|
||||
this.state = ReaderWriterState.OPEN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
|
||||
"Attempt to read from a file in state:%s", state);
|
||||
return iterator.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public E next() {
|
||||
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
|
||||
"Attempt to read from a file in state:%s", state);
|
||||
return iterator.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!state.equals(ReaderWriterState.OPEN)) {
|
||||
return;
|
||||
}
|
||||
|
||||
iterator = null;
|
||||
try {
|
||||
incoming.close();
|
||||
} catch (IOException e) {
|
||||
throw new DatasetIOException("Unable to close reader path", e);
|
||||
}
|
||||
|
||||
state = ReaderWriterState.CLOSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return (this.state == ReaderWriterState.OPEN);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.file.DataFileStream;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.io.InputStreamCallback;
|
||||
import org.apache.nifi.util.StopWatch;
|
||||
import org.kitesdk.data.DatasetIOException;
|
||||
import org.kitesdk.data.DatasetWriter;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.IncompatibleSchemaException;
|
||||
import org.kitesdk.data.ValidationException;
|
||||
import org.kitesdk.data.View;
|
||||
import org.kitesdk.data.spi.SchemaValidationUtil;
|
||||
|
||||
@Tags({"kite", "avro", "parquet", "hive", "hdfs", "hbase"})
|
||||
@CapabilityDescription("Stores Avro records in a Kite dataset")
|
||||
public class StoreInKiteDataset extends AbstractKiteProcessor {
|
||||
private static Relationship SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("FlowFile content has been successfully saved")
|
||||
.build();
|
||||
|
||||
private static Relationship INCOMPATIBLE = new Relationship.Builder()
|
||||
.name("incompatible")
|
||||
.description("FlowFile content is not compatible with the target dataset")
|
||||
.build();
|
||||
|
||||
private static Relationship FAILURE = new Relationship.Builder()
|
||||
.name("failure")
|
||||
.description("FlowFile content could not be processed")
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor KITE_DATASET_URI =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("Target dataset URI")
|
||||
.description(
|
||||
"URI that identifies a Kite dataset where data will be stored")
|
||||
.addValidator(RECOGNIZED_URI)
|
||||
.expressionLanguageSupported(true)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private static final List<PropertyDescriptor> PROPERTIES =
|
||||
ImmutableList.<PropertyDescriptor>builder()
|
||||
.addAll(AbstractKiteProcessor.getProperties())
|
||||
.add(KITE_DATASET_URI)
|
||||
.build();
|
||||
|
||||
private static final Set<Relationship> RELATIONSHIPS =
|
||||
ImmutableSet.<Relationship>builder()
|
||||
.add(SUCCESS)
|
||||
.add(INCOMPATIBLE)
|
||||
.add(FAILURE)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, final ProcessSession session)
|
||||
throws ProcessException {
|
||||
FlowFile flowFile = session.get();
|
||||
if (flowFile == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final View<Record> target = load(context, flowFile);
|
||||
final Schema schema = target.getDataset().getDescriptor().getSchema();
|
||||
|
||||
try {
|
||||
StopWatch timer = new StopWatch(true);
|
||||
session.read(flowFile, new InputStreamCallback() {
|
||||
@Override
|
||||
public void process(InputStream in) throws IOException {
|
||||
try (DataFileStream<Record> stream = new DataFileStream<>(
|
||||
in, AvroUtil.newDatumReader(schema, Record.class))) {
|
||||
IncompatibleSchemaException.check(
|
||||
SchemaValidationUtil.canRead(stream.getSchema(), schema),
|
||||
"Incompatible file schema %s, expected %s",
|
||||
stream.getSchema(), schema);
|
||||
|
||||
long written = 0L;
|
||||
try (DatasetWriter<Record> writer = target.newWriter()) {
|
||||
for (Record record : stream) {
|
||||
writer.write(record);
|
||||
written += 1;
|
||||
}
|
||||
} finally {
|
||||
session.adjustCounter("Stored records", written,
|
||||
true /* cannot roll back the write */);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
timer.stop();
|
||||
|
||||
session.getProvenanceReporter().send(flowFile,
|
||||
target.getUri().toString(),
|
||||
timer.getDuration(TimeUnit.MILLISECONDS),
|
||||
true /* cannot roll back the write */ );
|
||||
|
||||
session.transfer(flowFile, SUCCESS);
|
||||
|
||||
} catch (ProcessException | DatasetIOException e) {
|
||||
getLogger().error("Failed to read FlowFile", e);
|
||||
session.transfer(flowFile, FAILURE);
|
||||
|
||||
} catch (ValidationException e) {
|
||||
getLogger().error(e.getMessage());
|
||||
getLogger().debug("Incompatible schema error", e);
|
||||
session.transfer(flowFile, INCOMPATIBLE);
|
||||
|
||||
} catch (Throwable t) {
|
||||
getLogger().error("Unknown Throwable", t);
|
||||
session.rollback(true); // penalize just in case
|
||||
context.yield();
|
||||
}
|
||||
}
|
||||
|
||||
private View<Record> load(ProcessContext context, FlowFile file) {
|
||||
String uri = context.getProperty(KITE_DATASET_URI)
|
||||
.evaluateAttributeExpressions(file)
|
||||
.getValue();
|
||||
return Datasets.load(uri, Record.class);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.kitesdk.data.spi.filesystem;
|
||||
|
||||
import au.com.bytecode.opencsv.CSVReader;
|
||||
import java.io.InputStream;
|
||||
import com.google.common.collect.Lists;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.mapreduce.InputSplit;
|
||||
import org.apache.hadoop.mapreduce.RecordReader;
|
||||
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.DatasetIOException;
|
||||
import org.kitesdk.data.spi.AbstractDatasetReader;
|
||||
import org.kitesdk.data.spi.DescriptorUtil;
|
||||
import org.kitesdk.data.spi.EntityAccessor;
|
||||
import org.kitesdk.data.spi.ReaderWriterState;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.kitesdk.data.spi.filesystem.CSVProperties;
|
||||
import org.kitesdk.data.spi.filesystem.CSVUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.NoSuchElementException;
|
||||
|
||||
import static org.kitesdk.data.spi.filesystem.FileSystemProperties.REUSE_RECORDS;
|
||||
|
||||
/**
|
||||
* This is a temporary addition. The version in 0.18.0 throws a NPE when the
|
||||
* InputStream constructor is used.
|
||||
*/
|
||||
public class CSVFileReaderFixed<E> extends AbstractDatasetReader<E> {
|
||||
|
||||
private final CSVProperties props;
|
||||
private final Schema schema;
|
||||
private final boolean reuseRecords;
|
||||
|
||||
private final Class<E> recordClass;
|
||||
|
||||
private CSVReader reader = null;
|
||||
private CSVRecordBuilder<E> builder;
|
||||
|
||||
private InputStream incoming = null;
|
||||
|
||||
// state
|
||||
private ReaderWriterState state = ReaderWriterState.NEW;
|
||||
private boolean hasNext = false;
|
||||
private String[] next = null;
|
||||
private E record = null;
|
||||
|
||||
public CSVFileReaderFixed(InputStream incoming, CSVProperties props,
|
||||
Schema schema, Class<E> type) {
|
||||
this.incoming = incoming;
|
||||
this.schema = schema;
|
||||
this.recordClass = type;
|
||||
this.state = ReaderWriterState.NEW;
|
||||
this.props = props;
|
||||
this.reuseRecords = false;
|
||||
|
||||
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
|
||||
"Schemas for CSV files must be records of primitive types");
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void initialize() {
|
||||
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
|
||||
"A reader may not be opened more than once - current state:%s", state);
|
||||
|
||||
this.reader = CSVUtil.newReader(incoming, props);
|
||||
|
||||
List<String> header = null;
|
||||
if (props.useHeader) {
|
||||
this.hasNext = advance();
|
||||
header = Lists.newArrayList(next);
|
||||
} else if (props.header != null) {
|
||||
try {
|
||||
header = Lists.newArrayList(
|
||||
CSVUtil.newParser(props).parseLine(props.header));
|
||||
} catch (IOException e) {
|
||||
throw new DatasetIOException(
|
||||
"Failed to parse header from properties: " + props.header, e);
|
||||
}
|
||||
}
|
||||
|
||||
this.builder = new CSVRecordBuilder<E>(schema, recordClass, header);
|
||||
|
||||
// initialize by reading the first record
|
||||
this.hasNext = advance();
|
||||
|
||||
this.state = ReaderWriterState.OPEN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
|
||||
"Attempt to read from a file in state:%s", state);
|
||||
return hasNext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public E next() {
|
||||
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
|
||||
"Attempt to read from a file in state:%s", state);
|
||||
|
||||
if (!hasNext) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
try {
|
||||
if (reuseRecords) {
|
||||
this.record = builder.makeRecord(next, record);
|
||||
return record;
|
||||
} else {
|
||||
return builder.makeRecord(next, null);
|
||||
}
|
||||
} finally {
|
||||
this.hasNext = advance();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean advance() {
|
||||
try {
|
||||
next = reader.readNext();
|
||||
} catch (IOException e) {
|
||||
throw new DatasetIOException("Could not read record", e);
|
||||
}
|
||||
return (next != null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (!state.equals(ReaderWriterState.OPEN)) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
reader.close();
|
||||
} catch (IOException e) {
|
||||
throw new DatasetIOException("Unable to close reader", e);
|
||||
}
|
||||
|
||||
state = ReaderWriterState.CLOSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return (this.state == ReaderWriterState.OPEN);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
org.apache.nifi.processors.kite.StoreInKiteDataset
|
||||
org.apache.nifi.processors.kite.ConvertCSVToAvro
|
||||
org.apache.nifi.processors.kite.ConvertJSONToAvro
|
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
|
||||
|
||||
public class TestCSVToAvroProcessor {
|
||||
|
||||
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
|
||||
.requiredLong("id")
|
||||
.requiredString("color")
|
||||
.optionalDouble("price")
|
||||
.endRecord();
|
||||
|
||||
public static final String CSV_CONTENT = "" +
|
||||
"1,green\n" +
|
||||
",blue,\n" + // invalid, ID is missing
|
||||
"2,grey,12.95";
|
||||
|
||||
@Test
|
||||
public void testBasicConversion() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(streamFor(CSV_CONTENT));
|
||||
runner.run();
|
||||
|
||||
long converted = runner.getCounterValue("Converted records");
|
||||
long errors = runner.getCounterValue("Conversion errors");
|
||||
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
||||
Assert.assertEquals("Should reject 1 row", 1, errors);
|
||||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAlternateCharset() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
|
||||
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
|
||||
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
|
||||
runner.run();
|
||||
|
||||
long converted = runner.getCounterValue("Converted records");
|
||||
long errors = runner.getCounterValue("Conversion errors");
|
||||
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
||||
Assert.assertEquals("Should reject 1 row", 1, errors);
|
||||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCSVProperties() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
|
||||
ConvertCSVToAvro processor = new ConvertCSVToAvro();
|
||||
ProcessContext context = runner.getProcessContext();
|
||||
|
||||
// check defaults
|
||||
processor.createCSVProperties(context);
|
||||
Assert.assertEquals("Charset should match",
|
||||
"utf8", processor.props.charset);
|
||||
Assert.assertEquals("Delimiter should match",
|
||||
",", processor.props.delimiter);
|
||||
Assert.assertEquals("Quote should match",
|
||||
"\"", processor.props.quote);
|
||||
Assert.assertEquals("Escape should match",
|
||||
"\\", processor.props.escape);
|
||||
Assert.assertEquals("Header flag should match",
|
||||
false, processor.props.useHeader);
|
||||
Assert.assertEquals("Lines to skip should match",
|
||||
0, processor.props.linesToSkip);
|
||||
|
||||
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
|
||||
runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
|
||||
runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
|
||||
runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
|
||||
runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
|
||||
runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
|
||||
|
||||
// check updates
|
||||
processor.createCSVProperties(context);
|
||||
Assert.assertEquals("Charset should match",
|
||||
"utf16", processor.props.charset);
|
||||
Assert.assertEquals("Delimiter should match",
|
||||
"|", processor.props.delimiter);
|
||||
Assert.assertEquals("Quote should match",
|
||||
"'", processor.props.quote);
|
||||
Assert.assertEquals("Escape should match",
|
||||
"\u2603", processor.props.escape);
|
||||
Assert.assertEquals("Header flag should match",
|
||||
true, processor.props.useHeader);
|
||||
Assert.assertEquals("Lines to skip should match",
|
||||
2, processor.props.linesToSkip);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||
|
||||
public class TestConfigurationProperty {
|
||||
|
||||
@Rule
|
||||
public final TemporaryFolder temp = new TemporaryFolder();
|
||||
public File confLocation;
|
||||
|
||||
@Before
|
||||
public void saveConfiguration() throws IOException {
|
||||
Configuration conf = new Configuration(false);
|
||||
conf.setBoolean("nifi.config.canary", true);
|
||||
|
||||
confLocation = temp.newFile("nifi-conf.xml");
|
||||
FileOutputStream out = new FileOutputStream(confLocation);
|
||||
conf.writeXml(out);
|
||||
out.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigurationCanary() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(
|
||||
AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
|
||||
|
||||
Assert.assertFalse("Should not contain canary value",
|
||||
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
|
||||
|
||||
AbstractKiteProcessor processor = new StoreInKiteDataset();
|
||||
ProcessContext context = runner.getProcessContext();
|
||||
processor.setDefaultConfiguration(context);
|
||||
|
||||
Assert.assertTrue("Should contain canary value",
|
||||
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilesMustExist() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(
|
||||
AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
|
||||
runner.assertNotValid();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||
|
||||
import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
|
||||
|
||||
public class TestGetSchema {
|
||||
|
||||
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
|
||||
.requiredLong("id")
|
||||
.requiredString("color")
|
||||
.optionalDouble("price")
|
||||
.endRecord();
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temp = new TemporaryFolder();
|
||||
|
||||
@Test
|
||||
public void testSchemaFromFileSystem() throws IOException {
|
||||
File schemaFile = temp.newFile("schema.avsc");
|
||||
FileOutputStream out = new FileOutputStream(schemaFile);
|
||||
out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
|
||||
out.close();
|
||||
|
||||
Schema schema = AbstractKiteProcessor.getSchema(
|
||||
schemaFile.toString(), DefaultConfiguration.get());
|
||||
|
||||
Assert.assertEquals("Schema from file should match", SCHEMA, schema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaFromKiteURIs() throws IOException {
|
||||
String location = temp.newFolder("ns", "temp").toString();
|
||||
String datasetUri = "dataset:file:" + location;
|
||||
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schema(SCHEMA)
|
||||
.build();
|
||||
|
||||
Datasets.create(datasetUri, descriptor);
|
||||
|
||||
Schema schema = AbstractKiteProcessor.getSchema(
|
||||
datasetUri, DefaultConfiguration.get());
|
||||
Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema);
|
||||
|
||||
schema = AbstractKiteProcessor.getSchema(
|
||||
"view:file:" + location + "?color=orange", DefaultConfiguration.get());
|
||||
Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaFromResourceURI() throws IOException {
|
||||
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
|
||||
.build();
|
||||
Schema expected = descriptor.getSchema();
|
||||
|
||||
Schema schema = AbstractKiteProcessor.getSchema(
|
||||
"resource:schema/user.avsc", DefaultConfiguration.get());
|
||||
|
||||
Assert.assertEquals("Schema from resource URI should match",
|
||||
expected, schema);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
|
||||
|
||||
public class TestJSONToAvroProcessor {
|
||||
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
|
||||
.requiredLong("id")
|
||||
.requiredString("color")
|
||||
.optionalDouble("price")
|
||||
.endRecord();
|
||||
|
||||
public static final String JSON_CONTENT = "" +
|
||||
"{\"id\": 1,\"color\": \"green\"}" +
|
||||
"{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
|
||||
"{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
|
||||
|
||||
@Test
|
||||
public void testBasicConversion() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
|
||||
runner.assertNotValid();
|
||||
runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(streamFor(JSON_CONTENT));
|
||||
runner.run();
|
||||
|
||||
long converted = runner.getCounterValue("Converted records");
|
||||
long errors = runner.getCounterValue("Conversion errors");
|
||||
Assert.assertEquals("Should convert 2 rows", 2, converted);
|
||||
Assert.assertEquals("Should reject 1 row", 1, errors);
|
||||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.List;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.Datasets;
|
||||
import org.kitesdk.data.spi.DefaultConfiguration;
|
||||
import org.kitesdk.minicluster.HdfsService;
|
||||
import org.kitesdk.minicluster.HiveService;
|
||||
import org.kitesdk.minicluster.MiniCluster;
|
||||
|
||||
import static org.apache.nifi.processors.kite.TestUtil.USER_SCHEMA;
|
||||
import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
|
||||
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
|
||||
import static org.apache.nifi.processors.kite.TestUtil.user;
|
||||
|
||||
public class TestKiteProcessorsCluster {
|
||||
|
||||
public static MiniCluster cluster = null;
|
||||
public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schema(USER_SCHEMA)
|
||||
.build();
|
||||
|
||||
@BeforeClass
|
||||
public static void startCluster() throws IOException, InterruptedException {
|
||||
long rand = Math.abs((long) (Math.random() * 1000000));
|
||||
cluster = new MiniCluster.Builder()
|
||||
.workDir("/tmp/minicluster-" + rand)
|
||||
.clean(true)
|
||||
.addService(HdfsService.class)
|
||||
.addService(HiveService.class)
|
||||
.bindIP("127.0.0.1")
|
||||
.hiveMetastorePort(9083)
|
||||
.build();
|
||||
cluster.start();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopCluster() throws IOException, InterruptedException {
|
||||
if (cluster != null) {
|
||||
cluster.stop();
|
||||
cluster = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicStoreToHive() throws IOException {
|
||||
String datasetUri = "dataset:hive:ns/test";
|
||||
|
||||
Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class);
|
||||
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
|
||||
runner.assertValid();
|
||||
|
||||
List<Record> users = Lists.newArrayList(
|
||||
user("a", "a@example.com"),
|
||||
user("b", "b@example.com"),
|
||||
user("c", "c@example.com")
|
||||
);
|
||||
|
||||
runner.enqueue(streamFor(users));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
List<Record> stored = Lists.newArrayList(
|
||||
(Iterable<Record>) dataset.newReader());
|
||||
Assert.assertEquals("Records should match", users, stored);
|
||||
|
||||
Datasets.delete(datasetUri);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSchemaFromDistributedFileSystem() throws IOException {
|
||||
Schema expected = SchemaBuilder.record("Test").fields()
|
||||
.requiredLong("id")
|
||||
.requiredString("color")
|
||||
.optionalDouble("price")
|
||||
.endRecord();
|
||||
|
||||
Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
|
||||
FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
|
||||
OutputStream out = fs.create(schemaPath);
|
||||
out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
|
||||
out.close();
|
||||
|
||||
Schema schema = AbstractKiteProcessor.getSchema(
|
||||
schemaPath.toString(), DefaultConfiguration.get());
|
||||
|
||||
Assert.assertEquals("Schema from file should match", expected, schema);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,167 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.kitesdk.data.Dataset;
|
||||
import org.kitesdk.data.DatasetDescriptor;
|
||||
import org.kitesdk.data.Datasets;
|
||||
|
||||
import static org.apache.nifi.processors.kite.TestUtil.invalidStreamFor;
|
||||
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
|
||||
import static org.apache.nifi.processors.kite.TestUtil.user;
|
||||
|
||||
public class TestKiteStorageProcessor {
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temp = new TemporaryFolder();
|
||||
|
||||
private String datasetUri = null;
|
||||
private Dataset<Record> dataset = null;
|
||||
|
||||
@Before
|
||||
public void createDataset() throws Exception {
|
||||
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
|
||||
.schema(TestUtil.USER_SCHEMA)
|
||||
.build();
|
||||
this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
|
||||
this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
|
||||
}
|
||||
|
||||
@After
|
||||
public void deleteDataset() throws Exception {
|
||||
Datasets.delete(datasetUri);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicStore() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.assertNotValid();
|
||||
|
||||
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
|
||||
runner.assertValid();
|
||||
|
||||
List<Record> users = Lists.newArrayList(
|
||||
user("a", "a@example.com"),
|
||||
user("b", "b@example.com"),
|
||||
user("c", "c@example.com")
|
||||
);
|
||||
|
||||
runner.enqueue(streamFor(users));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
runner.assertQueueEmpty();
|
||||
Assert.assertEquals("Should store 3 values",
|
||||
3, (long) runner.getCounterValue("Stored records"));
|
||||
|
||||
List<Record> stored = Lists.newArrayList(
|
||||
(Iterable<Record>) dataset.newReader());
|
||||
Assert.assertEquals("Records should match", users, stored);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testViewURI() {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(
|
||||
StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
|
||||
runner.assertValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInvalidURI() {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(
|
||||
StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnreadableContent() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(invalidStreamFor(user("a", "a@example.com")));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred("failure", 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCorruptedBlocks() throws IOException {
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
|
||||
runner.assertValid();
|
||||
|
||||
List<Record> records = Lists.newArrayList();
|
||||
for (int i = 0; i < 10000; i += 1) {
|
||||
String num = String.valueOf(i);
|
||||
records.add(user(num, num + "@example.com"));
|
||||
}
|
||||
|
||||
runner.enqueue(invalidStreamFor(records));
|
||||
runner.run();
|
||||
|
||||
long stored = runner.getCounterValue("Stored records");
|
||||
Assert.assertTrue("Should store some readable values",
|
||||
0 < stored && stored < 10000);
|
||||
|
||||
runner.assertAllFlowFilesTransferred("success", 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncompatibleSchema() throws IOException {
|
||||
Schema incompatible = SchemaBuilder.record("User").fields()
|
||||
.requiredLong("id")
|
||||
.requiredString("username")
|
||||
.optionalString("email") // the dataset requires this field
|
||||
.endRecord();
|
||||
|
||||
// this user has the email field and could be stored, but the schema is
|
||||
// still incompatible so the entire stream is rejected
|
||||
Record incompatibleUser = new Record(incompatible);
|
||||
incompatibleUser.put("id", 1L);
|
||||
incompatibleUser.put("username", "a");
|
||||
incompatibleUser.put("email", "a@example.com");
|
||||
|
||||
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
|
||||
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
|
||||
runner.assertValid();
|
||||
|
||||
runner.enqueue(streamFor(incompatibleUser));
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred("incompatible", 1);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.processors.kite;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.CharBuffer;
|
||||
import java.nio.charset.CharacterCodingException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.CharsetEncoder;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaBuilder;
|
||||
import org.apache.avro.file.CodecFactory;
|
||||
import org.apache.avro.file.DataFileWriter;
|
||||
import org.apache.avro.generic.GenericData.Record;
|
||||
|
||||
public class TestUtil {
|
||||
public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields()
|
||||
.requiredString("username")
|
||||
.requiredString("email")
|
||||
.endRecord();
|
||||
|
||||
public static Record user(String username, String email) {
|
||||
Record user = new Record(USER_SCHEMA);
|
||||
user.put("username", username);
|
||||
user.put("email", email);
|
||||
return user;
|
||||
}
|
||||
|
||||
public static InputStream streamFor(Record... records) throws IOException {
|
||||
return streamFor(Arrays.asList(records));
|
||||
}
|
||||
|
||||
public static InputStream streamFor(List<Record> records) throws IOException {
|
||||
return new ByteArrayInputStream(bytesFor(records));
|
||||
}
|
||||
|
||||
public static InputStream invalidStreamFor(Record... records) throws IOException {
|
||||
return invalidStreamFor(Arrays.asList(records));
|
||||
}
|
||||
|
||||
public static InputStream invalidStreamFor(List<Record> records) throws IOException {
|
||||
// purposely truncate the content
|
||||
byte[] bytes = bytesFor(records);
|
||||
return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
|
||||
}
|
||||
|
||||
private static byte[] bytesFor(List<Record> records) throws IOException {
|
||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
||||
DataFileWriter<Record> writer = new DataFileWriter<>(
|
||||
AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
|
||||
writer.setCodec(CodecFactory.snappyCodec());
|
||||
writer = writer.create(records.get(0).getSchema(), out);
|
||||
|
||||
for (Record record : records) {
|
||||
writer.append(record);
|
||||
}
|
||||
|
||||
writer.flush();
|
||||
|
||||
return out.toByteArray();
|
||||
}
|
||||
|
||||
public static InputStream streamFor(String content) throws CharacterCodingException {
|
||||
return streamFor(content, Charset.forName("utf8"));
|
||||
}
|
||||
|
||||
public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException {
|
||||
return new ByteArrayInputStream(bytesFor(content, charset));
|
||||
}
|
||||
|
||||
public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException {
|
||||
CharBuffer chars = CharBuffer.wrap(content);
|
||||
CharsetEncoder encoder = charset.newEncoder();
|
||||
ByteBuffer buffer = encoder.encode(chars);
|
||||
byte[] bytes = new byte[buffer.remaining()];
|
||||
buffer.get(bytes);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-nar-bundles</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-kite-bundle</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
|
||||
<name>Kite Bundle</name>
|
||||
<description>A bundle of processors that use Kite to store data in Hadoop</description>
|
||||
|
||||
<modules>
|
||||
<module>nifi-kite-processors</module>
|
||||
<module>nifi-kite-nar</module>
|
||||
</modules>
|
||||
|
||||
<build>
|
||||
<pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<configuration>
|
||||
<redirectTestOutputToFile>true</redirectTestOutputToFile>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
</build>
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-kite-processors</artifactId>
|
||||
<version>0.0.2-incubating-SNAPSHOT</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
</project>
|
Loading…
Reference in New Issue