Merge branch 'NIFI-238-kite-processors' of https://github.com/rdblue/incubator-nifi into develop

This commit is contained in:
Mark Payne 2015-03-05 12:28:16 -05:00
commit dd879f2324
18 changed files with 2144 additions and 0 deletions

View File

@ -0,0 +1,35 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kite-nar</artifactId>
<packaging>nar</packaging>
<name>Kite NAR</name>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,148 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-bundle</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kite-processors</artifactId>
<packaging>jar</packaging>
<name>Kite Hadoop Processors</name>
<properties>
<kite.version>0.18.0</kite.version>
<guava.version>11.0.2</guava.version>
<junit.version>4.10</junit.version>
<findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>
</properties>
<dependencies>
<!-- NiFi -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
</dependency>
<!-- Kite -->
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-core</artifactId>
<version>${kite.version}</version>
<exclusions>
<exclusion>
<!-- Use findbugs-annotations instead -->
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-hadoop-dependencies</artifactId>
<type>pom</type>
<version>${kite.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-app</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<!-- avoid warnings by bundling annotations -->
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<scope>compile</scope>
<version>${findbugs-annotations.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-minicluster</artifactId>
<version>${kite.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-servlet</artifactId>
<version>1.14</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-data-core</artifactId>
<version>${kite.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-hadoop-test-dependencies</artifactId>
<type>pom</type>
<scope>test</scope>
<version>${kite.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Resources;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.kitesdk.data.DatasetNotFoundException;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.URIBuilder;
import org.kitesdk.data.spi.DefaultConfiguration;
abstract class AbstractKiteProcessor extends AbstractProcessor {
private static final Splitter COMMA = Splitter.on(',').trimResults();
protected static final Validator FILES_EXIST = new Validator() {
@Override
public ValidationResult validate(String subject, String configFiles,
ValidationContext context) {
if (configFiles != null && !configFiles.isEmpty()) {
for (String file : COMMA.split(configFiles)) {
ValidationResult result = StandardValidators.FILE_EXISTS_VALIDATOR
.validate(subject, file, context);
if (!result.isValid()) {
return result;
}
}
}
return new ValidationResult.Builder()
.subject(subject)
.input(configFiles)
.explanation("Files exist")
.valid(true)
.build();
}
};
protected static final PropertyDescriptor CONF_XML_FILES =
new PropertyDescriptor.Builder()
.name("Hadoop configuration files")
.description("A comma-separated list of Hadoop configuration files")
.addValidator(FILES_EXIST)
.build();
protected static final Validator RECOGNIZED_URI = new Validator() {
@Override
public ValidationResult validate(String subject, String uri,
ValidationContext context) {
String message = "not set";
boolean isValid = true;
if (uri == null || uri.isEmpty()) {
isValid = false;
} else {
try {
new URIBuilder(URI.create(uri)).build();
} catch (RuntimeException e) {
message = e.getMessage();
isValid = false;
}
}
return new ValidationResult.Builder()
.subject(subject)
.input(uri)
.explanation("Dataset URI is invalid: " + message)
.valid(isValid)
.build();
}
};
/**
* Resolves a {@link Schema} for the given string, either a URI or a JSON
* literal.
*/
protected static Schema getSchema(String uriOrLiteral, Configuration conf) {
URI uri;
try {
uri = new URI(uriOrLiteral);
} catch (URISyntaxException e) {
// try to parse the schema as a literal
return parseSchema(uriOrLiteral);
}
try {
if ("dataset".equals(uri.getScheme()) || "view".equals(uri.getScheme())) {
return Datasets.load(uri).getDataset().getDescriptor().getSchema();
} else if ("resource".equals(uri.getScheme())) {
InputStream in = Resources.getResource(uri.getSchemeSpecificPart())
.openStream();
return parseSchema(uri, in);
} else {
// try to open the file
Path schemaPath = new Path(uri);
FileSystem fs = schemaPath.getFileSystem(conf);
return parseSchema(uri, fs.open(schemaPath));
}
} catch (DatasetNotFoundException e) {
throw new SchemaNotFoundException(
"Cannot read schema of missing dataset: " + uri, e);
} catch (IOException e) {
throw new SchemaNotFoundException(
"Failed while reading " + uri + ": " + e.getMessage(), e);
}
}
private static Schema parseSchema(String literal) {
try {
return new Schema.Parser().parse(literal);
} catch (RuntimeException e) {
throw new SchemaNotFoundException(
"Failed to parse schema: " + literal, e);
}
}
private static Schema parseSchema(URI uri, InputStream in) throws IOException {
try {
return new Schema.Parser().parse(in);
} catch (RuntimeException e) {
throw new SchemaNotFoundException("Failed to parse schema at " + uri, e);
}
}
protected static final Validator SCHEMA_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String uri, ValidationContext context) {
Configuration conf = getConfiguration(
context.getProperty(CONF_XML_FILES).getValue());
String error = null;
try {
getSchema(uri, conf);
} catch (SchemaNotFoundException e) {
error = e.getMessage();
}
return new ValidationResult.Builder()
.subject(subject)
.input(uri)
.explanation(error)
.valid(error == null)
.build();
}
};
protected static final List<PropertyDescriptor> ABSTRACT_KITE_PROPS =
ImmutableList.<PropertyDescriptor>builder()
.add(CONF_XML_FILES)
.build();
static List<PropertyDescriptor> getProperties() {
return ABSTRACT_KITE_PROPS;
}
@OnScheduled
protected void setDefaultConfiguration(ProcessContext context)
throws IOException {
DefaultConfiguration.set(getConfiguration(
context.getProperty(CONF_XML_FILES).getValue()));
}
protected static Configuration getConfiguration(String configFiles) {
Configuration conf = DefaultConfiguration.get();
if (configFiles == null || configFiles.isEmpty()) {
return conf;
}
for (String file : COMMA.split(configFiles)) {
// process each resource only once
if (conf.getResource(file) == null) {
// use Path instead of String to get the file from the FS
conf.addResource(new Path(file));
}
}
return conf;
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return ABSTRACT_KITE_PROPS;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import static org.apache.avro.generic.GenericData.StringType;
class AvroUtil {
@SuppressWarnings("unchecked")
public static <D> DatumWriter<D> newDatumWriter(Schema schema, Class<D> dClass) {
return (DatumWriter<D>) GenericData.get().createDatumWriter(schema);
}
@SuppressWarnings("unchecked")
public static <D> DatumReader<D> newDatumReader(Schema schema, Class<D> dClass) {
return (DatumReader<D>) GenericData.get().createDatumReader(schema);
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.CSVFileReaderFixed;
import org.kitesdk.data.spi.filesystem.CSVProperties;
import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
@Tags({"kite", "csv", "avro"})
@CapabilityDescription(
"Converts CSV files to Avro according to an Avro Schema")
public class ConvertCSVToAvro extends AbstractKiteProcessor {
private static CSVProperties DEFAULTS = new CSVProperties.Builder().build();
private static Validator CHAR_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input,
ValidationContext context) {
return new ValidationResult.Builder()
.subject(subject)
.input(input)
.explanation("Only single characters are supported")
.valid(input.length() == 1)
.build();
}
};
private static Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
private static Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
@VisibleForTesting
static final PropertyDescriptor SCHEMA =
new PropertyDescriptor.Builder()
.name("Record schema")
.description(
"Outgoing Avro schema for each record created from a CSV row")
.addValidator(SCHEMA_VALIDATOR)
.required(true)
.build();
@VisibleForTesting
static final PropertyDescriptor CHARSET =
new PropertyDescriptor.Builder()
.name("CSV charset")
.description("Character set for CSV files")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(DEFAULTS.charset)
.build();
@VisibleForTesting
static final PropertyDescriptor DELIMITER =
new PropertyDescriptor.Builder()
.name("CSV delimiter")
.description("Delimiter character for CSV records")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.delimiter)
.build();
@VisibleForTesting
static final PropertyDescriptor QUOTE =
new PropertyDescriptor.Builder()
.name("CSV quote character")
.description("Quote character for CSV values")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.quote)
.build();
@VisibleForTesting
static final PropertyDescriptor ESCAPE =
new PropertyDescriptor.Builder()
.name("CSV escape character")
.description("Escape character for CSV values")
.addValidator(CHAR_VALIDATOR)
.defaultValue(DEFAULTS.escape)
.build();
@VisibleForTesting
static final PropertyDescriptor HAS_HEADER =
new PropertyDescriptor.Builder()
.name("Use CSV header line")
.description("Whether to use the first line as a header")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue(String.valueOf(DEFAULTS.useHeader))
.build();
@VisibleForTesting
static final PropertyDescriptor LINES_TO_SKIP =
new PropertyDescriptor.Builder()
.name("Lines to skip")
.description("Number of lines to skip before reading header or data")
.addValidator(createLongValidator(0L, Integer.MAX_VALUE, true))
.defaultValue(String.valueOf(DEFAULTS.linesToSkip))
.build();
private static final List<PropertyDescriptor> PROPERTIES =
ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(SCHEMA)
.add(CHARSET)
.add(DELIMITER)
.add(QUOTE)
.add(ESCAPE)
.add(HAS_HEADER)
.add(LINES_TO_SKIP)
.build();
private static final Set<Relationship> RELATIONSHIPS =
ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(FAILURE)
.build();
// Immutable configuration
@VisibleForTesting
volatile CSVProperties props;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@OnScheduled
public void createCSVProperties(ProcessContext context) throws IOException {
super.setDefaultConfiguration(context);
this.props = new CSVProperties.Builder()
.charset(context.getProperty(CHARSET).getValue())
.delimiter(context.getProperty(DELIMITER).getValue())
.quote(context.getProperty(QUOTE).getValue())
.escape(context.getProperty(ESCAPE).getValue())
.hasHeader(context.getProperty(HAS_HEADER).asBoolean())
.linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger())
.build();
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Schema schema = getSchema(
context.getProperty(SCHEMA).getValue(),
DefaultConfiguration.get());
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class));
writer.setCodec(CodecFactory.snappyCodec());
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
long written = 0L;
long errors = 0L;
try (CSVFileReaderFixed<Record> reader = new CSVFileReaderFixed<>(
in, props, schema, Record.class)) {
reader.initialize();
try (DataFileWriter<Record> w = writer.create(schema, out)) {
while (reader.hasNext()) {
try {
Record record = reader.next();
w.append(record);
written += 1;
} catch (DatasetRecordException e) {
errors += 1;
}
}
}
}
session.adjustCounter("Converted records", written,
false /* update only if file transfer is successful */);
session.adjustCounter("Conversion errors", errors,
false /* update only if file transfer is successful */);
}
});
session.transfer(flowFile, SUCCESS);
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed reading or writing", e);
session.transfer(flowFile, FAILURE);
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
} catch (Throwable t) {
getLogger().error("Unknown Throwable", t);
session.rollback(true); // penalize just in case
context.yield();
}
}
}

View File

@ -0,0 +1,157 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.StreamCallback;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.spi.DefaultConfiguration;
@Tags({"kite", "json", "avro"})
@CapabilityDescription(
"Converts JSON files to Avro according to an Avro Schema")
public class ConvertJSONToAvro extends AbstractKiteProcessor {
private static Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
private static Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
@VisibleForTesting
static final PropertyDescriptor SCHEMA =
new PropertyDescriptor.Builder()
.name("Record schema")
.description(
"Outgoing Avro schema for each record created from a JSON object")
.addValidator(SCHEMA_VALIDATOR)
.required(true)
.build();
private static final List<PropertyDescriptor> PROPERTIES =
ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(SCHEMA)
.build();
private static final Set<Relationship> RELATIONSHIPS =
ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(FAILURE)
.build();
public ConvertJSONToAvro() {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final Schema schema = getSchema(
context.getProperty(SCHEMA).getValue(),
DefaultConfiguration.get());
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class));
writer.setCodec(CodecFactory.snappyCodec());
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(InputStream in, OutputStream out) throws IOException {
long written = 0L;
long errors = 0L;
try (JSONFileReader<Record> reader = new JSONFileReader<>(
in, schema, Record.class)) {
reader.initialize();
try (DataFileWriter<Record> w = writer.create(schema, out)) {
while (reader.hasNext()) {
try {
Record record = reader.next();
w.append(record);
written += 1;
} catch (DatasetRecordException e) {
errors += 1;
}
}
}
session.adjustCounter("Converted records", written,
false /* update only if file transfer is successful */);
session.adjustCounter("Conversion errors", errors,
false /* update only if file transfer is successful */);
}
}
});
session.transfer(flowFile, SUCCESS);
//session.getProvenanceReporter().send(flowFile, target.getUri().toString());
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed reading or writing", e);
session.transfer(flowFile, FAILURE);
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.spi.AbstractDatasetReader;
import org.kitesdk.data.spi.DataModelUtil;
import org.kitesdk.data.spi.JsonUtil;
import org.kitesdk.data.spi.ReaderWriterState;
/**
* This is a temporary addition. The version in 0.18.0 throws a NPE when the
* InputStream constructor is used.
*/
class JSONFileReader<E> extends AbstractDatasetReader<E> {
private final GenericData model;
private final Schema schema;
private InputStream incoming = null;
// state
private ReaderWriterState state = ReaderWriterState.NEW;
private Iterator<E> iterator;
public JSONFileReader(InputStream incoming, Schema schema, Class<E> type) {
this.incoming = incoming;
this.schema = schema;
this.model = DataModelUtil.getDataModelForType(type);
this.state = ReaderWriterState.NEW;
}
@Override
public void initialize() {
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
"A reader may not be opened more than once - current state:%s", state);
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
"Schemas for JSON files should be record");
this.iterator = Iterators.transform(JsonUtil.parser(incoming),
new Function<JsonNode, E>() {
@Override
@SuppressWarnings("unchecked")
public E apply(@Nullable JsonNode node) {
return (E) JsonUtil.convertToAvro(model, node, schema);
}
});
this.state = ReaderWriterState.OPEN;
}
@Override
public boolean hasNext() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
return iterator.hasNext();
}
@Override
public E next() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
return iterator.next();
}
@Override
public void close() {
if (!state.equals(ReaderWriterState.OPEN)) {
return;
}
iterator = null;
try {
incoming.close();
} catch (IOException e) {
throw new DatasetIOException("Unable to close reader path", e);
}
state = ReaderWriterState.CLOSED;
}
@Override
public boolean isOpen() {
return (this.state == ReaderWriterState.OPEN);
}
}

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.util.StopWatch;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.IncompatibleSchemaException;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.SchemaValidationUtil;
@Tags({"kite", "avro", "parquet", "hive", "hdfs", "hbase"})
@CapabilityDescription("Stores Avro records in a Kite dataset")
public class StoreInKiteDataset extends AbstractKiteProcessor {
private static Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFile content has been successfully saved")
.build();
private static Relationship INCOMPATIBLE = new Relationship.Builder()
.name("incompatible")
.description("FlowFile content is not compatible with the target dataset")
.build();
private static Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFile content could not be processed")
.build();
public static final PropertyDescriptor KITE_DATASET_URI =
new PropertyDescriptor.Builder()
.name("Target dataset URI")
.description(
"URI that identifies a Kite dataset where data will be stored")
.addValidator(RECOGNIZED_URI)
.expressionLanguageSupported(true)
.required(true)
.build();
private static final List<PropertyDescriptor> PROPERTIES =
ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(KITE_DATASET_URI)
.build();
private static final Set<Relationship> RELATIONSHIPS =
ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(INCOMPATIBLE)
.add(FAILURE)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, final ProcessSession session)
throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final View<Record> target = load(context, flowFile);
final Schema schema = target.getDataset().getDescriptor().getSchema();
try {
StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try (DataFileStream<Record> stream = new DataFileStream<>(
in, AvroUtil.newDatumReader(schema, Record.class))) {
IncompatibleSchemaException.check(
SchemaValidationUtil.canRead(stream.getSchema(), schema),
"Incompatible file schema %s, expected %s",
stream.getSchema(), schema);
long written = 0L;
try (DatasetWriter<Record> writer = target.newWriter()) {
for (Record record : stream) {
writer.write(record);
written += 1;
}
} finally {
session.adjustCounter("Stored records", written,
true /* cannot roll back the write */);
}
}
}
});
timer.stop();
session.getProvenanceReporter().send(flowFile,
target.getUri().toString(),
timer.getDuration(TimeUnit.MILLISECONDS),
true /* cannot roll back the write */ );
session.transfer(flowFile, SUCCESS);
} catch (ProcessException | DatasetIOException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE);
} catch (ValidationException e) {
getLogger().error(e.getMessage());
getLogger().debug("Incompatible schema error", e);
session.transfer(flowFile, INCOMPATIBLE);
} catch (Throwable t) {
getLogger().error("Unknown Throwable", t);
session.rollback(true); // penalize just in case
context.yield();
}
}
private View<Record> load(ProcessContext context, FlowFile file) {
String uri = context.getProperty(KITE_DATASET_URI)
.evaluateAttributeExpressions(file)
.getValue();
return Datasets.load(uri, Record.class);
}
}

View File

@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kitesdk.data.spi.filesystem;
import au.com.bytecode.opencsv.CSVReader;
import java.io.InputStream;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.spi.AbstractDatasetReader;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.EntityAccessor;
import org.kitesdk.data.spi.ReaderWriterState;
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.kitesdk.data.spi.filesystem.CSVProperties;
import org.kitesdk.data.spi.filesystem.CSVUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.NoSuchElementException;
import static org.kitesdk.data.spi.filesystem.FileSystemProperties.REUSE_RECORDS;
/**
* This is a temporary addition. The version in 0.18.0 throws a NPE when the
* InputStream constructor is used.
*/
public class CSVFileReaderFixed<E> extends AbstractDatasetReader<E> {
private final CSVProperties props;
private final Schema schema;
private final boolean reuseRecords;
private final Class<E> recordClass;
private CSVReader reader = null;
private CSVRecordBuilder<E> builder;
private InputStream incoming = null;
// state
private ReaderWriterState state = ReaderWriterState.NEW;
private boolean hasNext = false;
private String[] next = null;
private E record = null;
public CSVFileReaderFixed(InputStream incoming, CSVProperties props,
Schema schema, Class<E> type) {
this.incoming = incoming;
this.schema = schema;
this.recordClass = type;
this.state = ReaderWriterState.NEW;
this.props = props;
this.reuseRecords = false;
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
"Schemas for CSV files must be records of primitive types");
}
@Override
@SuppressWarnings("unchecked")
public void initialize() {
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
"A reader may not be opened more than once - current state:%s", state);
this.reader = CSVUtil.newReader(incoming, props);
List<String> header = null;
if (props.useHeader) {
this.hasNext = advance();
header = Lists.newArrayList(next);
} else if (props.header != null) {
try {
header = Lists.newArrayList(
CSVUtil.newParser(props).parseLine(props.header));
} catch (IOException e) {
throw new DatasetIOException(
"Failed to parse header from properties: " + props.header, e);
}
}
this.builder = new CSVRecordBuilder<E>(schema, recordClass, header);
// initialize by reading the first record
this.hasNext = advance();
this.state = ReaderWriterState.OPEN;
}
@Override
public boolean hasNext() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
return hasNext;
}
@Override
public E next() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
if (!hasNext) {
throw new NoSuchElementException();
}
try {
if (reuseRecords) {
this.record = builder.makeRecord(next, record);
return record;
} else {
return builder.makeRecord(next, null);
}
} finally {
this.hasNext = advance();
}
}
private boolean advance() {
try {
next = reader.readNext();
} catch (IOException e) {
throw new DatasetIOException("Could not read record", e);
}
return (next != null);
}
@Override
public void close() {
if (!state.equals(ReaderWriterState.OPEN)) {
return;
}
try {
reader.close();
} catch (IOException e) {
throw new DatasetIOException("Unable to close reader", e);
}
state = ReaderWriterState.CLOSED;
}
@Override
public boolean isOpen() {
return (this.state == ReaderWriterState.OPEN);
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.kite.StoreInKiteDataset
org.apache.nifi.processors.kite.ConvertCSVToAvro
org.apache.nifi.processors.kite.ConvertJSONToAvro

View File

@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
public class TestCSVToAvroProcessor {
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
public static final String CSV_CONTENT = "" +
"1,green\n" +
",blue,\n" + // invalid, ID is missing
"2,grey,12.95";
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
runner.enqueue(streamFor(CSV_CONTENT));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertAllFlowFilesTransferred("success", 1);
}
@Test
public void testAlternateCharset() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.assertValid();
runner.enqueue(streamFor(CSV_CONTENT, Charset.forName("UTF-16")));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertAllFlowFilesTransferred("success", 1);
}
@Test
public void testCSVProperties() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
ConvertCSVToAvro processor = new ConvertCSVToAvro();
ProcessContext context = runner.getProcessContext();
// check defaults
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf8", processor.props.charset);
Assert.assertEquals("Delimiter should match",
",", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"\"", processor.props.quote);
Assert.assertEquals("Escape should match",
"\\", processor.props.escape);
Assert.assertEquals("Header flag should match",
false, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
0, processor.props.linesToSkip);
runner.setProperty(ConvertCSVToAvro.CHARSET, "utf16");
runner.setProperty(ConvertCSVToAvro.DELIMITER, "|");
runner.setProperty(ConvertCSVToAvro.QUOTE, "'");
runner.setProperty(ConvertCSVToAvro.ESCAPE, "\u2603");
runner.setProperty(ConvertCSVToAvro.HAS_HEADER, "true");
runner.setProperty(ConvertCSVToAvro.LINES_TO_SKIP, "2");
// check updates
processor.createCSVProperties(context);
Assert.assertEquals("Charset should match",
"utf16", processor.props.charset);
Assert.assertEquals("Delimiter should match",
"|", processor.props.delimiter);
Assert.assertEquals("Quote should match",
"'", processor.props.quote);
Assert.assertEquals("Escape should match",
"\u2603", processor.props.escape);
Assert.assertEquals("Header flag should match",
true, processor.props.useHeader);
Assert.assertEquals("Lines to skip should match",
2, processor.props.linesToSkip);
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.kitesdk.data.spi.DefaultConfiguration;
public class TestConfigurationProperty {
@Rule
public final TemporaryFolder temp = new TemporaryFolder();
public File confLocation;
@Before
public void saveConfiguration() throws IOException {
Configuration conf = new Configuration(false);
conf.setBoolean("nifi.config.canary", true);
confLocation = temp.newFile("nifi-conf.xml");
FileOutputStream out = new FileOutputStream(confLocation);
conf.writeXml(out);
out.close();
}
@Test
public void testConfigurationCanary() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, confLocation.toString());
Assert.assertFalse("Should not contain canary value",
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
AbstractKiteProcessor processor = new StoreInKiteDataset();
ProcessContext context = runner.getProcessContext();
processor.setDefaultConfiguration(context);
Assert.assertTrue("Should contain canary value",
DefaultConfiguration.get().getBoolean("nifi.config.canary", false));
}
@Test
public void testFilesMustExist() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
AbstractKiteProcessor.CONF_XML_FILES, temp.newFile().toString());
runner.assertNotValid();
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.spi.DefaultConfiguration;
import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
public class TestGetSchema {
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@Test
public void testSchemaFromFileSystem() throws IOException {
File schemaFile = temp.newFile("schema.avsc");
FileOutputStream out = new FileOutputStream(schemaFile);
out.write(bytesFor(SCHEMA.toString(), Charset.forName("utf8")));
out.close();
Schema schema = AbstractKiteProcessor.getSchema(
schemaFile.toString(), DefaultConfiguration.get());
Assert.assertEquals("Schema from file should match", SCHEMA, schema);
}
@Test
public void testSchemaFromKiteURIs() throws IOException {
String location = temp.newFolder("ns", "temp").toString();
String datasetUri = "dataset:file:" + location;
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(SCHEMA)
.build();
Datasets.create(datasetUri, descriptor);
Schema schema = AbstractKiteProcessor.getSchema(
datasetUri, DefaultConfiguration.get());
Assert.assertEquals("Schema from dataset URI should match", SCHEMA, schema);
schema = AbstractKiteProcessor.getSchema(
"view:file:" + location + "?color=orange", DefaultConfiguration.get());
Assert.assertEquals("Schema from view URI should match", SCHEMA, schema);
}
@Test
public void testSchemaFromResourceURI() throws IOException {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schemaUri("resource:schema/user.avsc") // in kite-data-core test-jar
.build();
Schema expected = descriptor.getSchema();
Schema schema = AbstractKiteProcessor.getSchema(
"resource:schema/user.avsc", DefaultConfiguration.get());
Assert.assertEquals("Schema from resource URI should match",
expected, schema);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
public class TestJSONToAvroProcessor {
public static final Schema SCHEMA = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
public static final String JSON_CONTENT = "" +
"{\"id\": 1,\"color\": \"green\"}" +
"{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is a string
"{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
runner.assertValid();
runner.enqueue(streamFor(JSON_CONTENT));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertAllFlowFilesTransferred("success", 1);
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.minicluster.HdfsService;
import org.kitesdk.minicluster.HiveService;
import org.kitesdk.minicluster.MiniCluster;
import static org.apache.nifi.processors.kite.TestUtil.USER_SCHEMA;
import static org.apache.nifi.processors.kite.TestUtil.bytesFor;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
import static org.apache.nifi.processors.kite.TestUtil.user;
public class TestKiteProcessorsCluster {
public static MiniCluster cluster = null;
public static DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(USER_SCHEMA)
.build();
@BeforeClass
public static void startCluster() throws IOException, InterruptedException {
long rand = Math.abs((long) (Math.random() * 1000000));
cluster = new MiniCluster.Builder()
.workDir("/tmp/minicluster-" + rand)
.clean(true)
.addService(HdfsService.class)
.addService(HiveService.class)
.bindIP("127.0.0.1")
.hiveMetastorePort(9083)
.build();
cluster.start();
}
@AfterClass
public static void stopCluster() throws IOException, InterruptedException {
if (cluster != null) {
cluster.stop();
cluster = null;
}
}
@Test
public void testBasicStoreToHive() throws IOException {
String datasetUri = "dataset:hive:ns/test";
Dataset<Record> dataset = Datasets.create(datasetUri, descriptor, Record.class);
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> users = Lists.newArrayList(
user("a", "a@example.com"),
user("b", "b@example.com"),
user("c", "c@example.com")
);
runner.enqueue(streamFor(users));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
Datasets.delete(datasetUri);
}
@Test
public void testSchemaFromDistributedFileSystem() throws IOException {
Schema expected = SchemaBuilder.record("Test").fields()
.requiredLong("id")
.requiredString("color")
.optionalDouble("price")
.endRecord();
Path schemaPath = new Path("hdfs:/tmp/schema.avsc");
FileSystem fs = schemaPath.getFileSystem(DefaultConfiguration.get());
OutputStream out = fs.create(schemaPath);
out.write(bytesFor(expected.toString(), Charset.forName("utf8")));
out.close();
Schema schema = AbstractKiteProcessor.getSchema(
schemaPath.toString(), DefaultConfiguration.get());
Assert.assertEquals("Schema from file should match", expected, schema);
}
}

View File

@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData.Record;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.Datasets;
import static org.apache.nifi.processors.kite.TestUtil.invalidStreamFor;
import static org.apache.nifi.processors.kite.TestUtil.streamFor;
import static org.apache.nifi.processors.kite.TestUtil.user;
public class TestKiteStorageProcessor {
@Rule
public TemporaryFolder temp = new TemporaryFolder();
private String datasetUri = null;
private Dataset<Record> dataset = null;
@Before
public void createDataset() throws Exception {
DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(TestUtil.USER_SCHEMA)
.build();
this.datasetUri = "dataset:file:" + temp.newFolder("ns", "temp").toString();
this.dataset = Datasets.create(datasetUri, descriptor, Record.class);
}
@After
public void deleteDataset() throws Exception {
Datasets.delete(datasetUri);
}
@Test
public void testBasicStore() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.assertNotValid();
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> users = Lists.newArrayList(
user("a", "a@example.com"),
user("b", "b@example.com"),
user("c", "c@example.com")
);
runner.enqueue(streamFor(users));
runner.run();
runner.assertAllFlowFilesTransferred("success", 1);
runner.assertQueueEmpty();
Assert.assertEquals("Should store 3 values",
3, (long) runner.getCounterValue("Stored records"));
List<Record> stored = Lists.newArrayList(
(Iterable<Record>) dataset.newReader());
Assert.assertEquals("Records should match", users, stored);
}
@Test
public void testViewURI() {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, "view:hive:ns/table?year=2015");
runner.assertValid();
}
@Test
public void testInvalidURI() {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(
StoreInKiteDataset.KITE_DATASET_URI, "dataset:unknown");
runner.assertNotValid();
}
@Test
public void testUnreadableContent() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.enqueue(invalidStreamFor(user("a", "a@example.com")));
runner.run();
runner.assertAllFlowFilesTransferred("failure", 1);
}
@Test
public void testCorruptedBlocks() throws IOException {
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
List<Record> records = Lists.newArrayList();
for (int i = 0; i < 10000; i += 1) {
String num = String.valueOf(i);
records.add(user(num, num + "@example.com"));
}
runner.enqueue(invalidStreamFor(records));
runner.run();
long stored = runner.getCounterValue("Stored records");
Assert.assertTrue("Should store some readable values",
0 < stored && stored < 10000);
runner.assertAllFlowFilesTransferred("success", 1);
}
@Test
public void testIncompatibleSchema() throws IOException {
Schema incompatible = SchemaBuilder.record("User").fields()
.requiredLong("id")
.requiredString("username")
.optionalString("email") // the dataset requires this field
.endRecord();
// this user has the email field and could be stored, but the schema is
// still incompatible so the entire stream is rejected
Record incompatibleUser = new Record(incompatible);
incompatibleUser.put("id", 1L);
incompatibleUser.put("username", "a");
incompatibleUser.put("email", "a@example.com");
TestRunner runner = TestRunners.newTestRunner(StoreInKiteDataset.class);
runner.setProperty(StoreInKiteDataset.KITE_DATASET_URI, datasetUri);
runner.assertValid();
runner.enqueue(streamFor(incompatibleUser));
runner.run();
runner.assertAllFlowFilesTransferred("incompatible", 1);
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
public class TestUtil {
public static final Schema USER_SCHEMA = SchemaBuilder.record("User").fields()
.requiredString("username")
.requiredString("email")
.endRecord();
public static Record user(String username, String email) {
Record user = new Record(USER_SCHEMA);
user.put("username", username);
user.put("email", email);
return user;
}
public static InputStream streamFor(Record... records) throws IOException {
return streamFor(Arrays.asList(records));
}
public static InputStream streamFor(List<Record> records) throws IOException {
return new ByteArrayInputStream(bytesFor(records));
}
public static InputStream invalidStreamFor(Record... records) throws IOException {
return invalidStreamFor(Arrays.asList(records));
}
public static InputStream invalidStreamFor(List<Record> records) throws IOException {
// purposely truncate the content
byte[] bytes = bytesFor(records);
return new ByteArrayInputStream(bytes, 0, bytes.length / 2);
}
private static byte[] bytesFor(List<Record> records) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(records.get(0).getSchema(), Record.class));
writer.setCodec(CodecFactory.snappyCodec());
writer = writer.create(records.get(0).getSchema(), out);
for (Record record : records) {
writer.append(record);
}
writer.flush();
return out.toByteArray();
}
public static InputStream streamFor(String content) throws CharacterCodingException {
return streamFor(content, Charset.forName("utf8"));
}
public static InputStream streamFor(String content, Charset charset) throws CharacterCodingException {
return new ByteArrayInputStream(bytesFor(content, charset));
}
public static byte[] bytesFor(String content, Charset charset) throws CharacterCodingException {
CharBuffer chars = CharBuffer.wrap(content);
CharsetEncoder encoder = charset.newEncoder();
ByteBuffer buffer = encoder.encode(chars);
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}
}

View File

@ -0,0 +1,59 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</parent>
<artifactId>nifi-kite-bundle</artifactId>
<packaging>pom</packaging>
<name>Kite Bundle</name>
<description>A bundle of processors that use Kite to store data in Hadoop</description>
<modules>
<module>nifi-kite-processors</module>
<module>nifi-kite-nar</module>
</modules>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-kite-processors</artifactId>
<version>0.0.2-incubating-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>