NIFI-428: Update to Kite 1.0.0.

Kite 1.0.0 fixes the JSON and CSV readers so they no longer need to be
in NiFi.
This commit is contained in:
Ryan Blue 2015-03-16 20:53:39 -07:00
parent eb757a484e
commit 37645b599e
8 changed files with 39 additions and 309 deletions

View File

@ -25,7 +25,7 @@
<packaging>jar</packaging> <packaging>jar</packaging>
<properties> <properties>
<kite.version>0.18.0</kite.version> <kite.version>1.0.0</kite.version>
<guava.version>11.0.2</guava.version> <guava.version>11.0.2</guava.version>
<junit.version>4.10</junit.version> <junit.version>4.10</junit.version>
<findbugs-annotations.version>1.3.9-1</findbugs-annotations.version> <findbugs-annotations.version>1.3.9-1</findbugs-annotations.version>

View File

@ -84,7 +84,7 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
boolean isValid = true; boolean isValid = true;
if (uri == null || uri.isEmpty()) { if (uri == null || uri.isEmpty()) {
isValid = false; isValid = false;
} else { } else if (!uri.contains("$")) {
try { try {
new URIBuilder(URI.create(uri)).build(); new URIBuilder(URI.create(uri)).build();
} catch (RuntimeException e) { } catch (RuntimeException e) {
@ -161,10 +161,12 @@ abstract class AbstractKiteProcessor extends AbstractProcessor {
context.getProperty(CONF_XML_FILES).getValue()); context.getProperty(CONF_XML_FILES).getValue());
String error = null; String error = null;
try { if (!uri.contains("$")) {
try {
getSchema(uri, conf); getSchema(uri, conf);
} catch (SchemaNotFoundException e) { } catch (SchemaNotFoundException e) {
error = e.getMessage(); error = e.getMessage();
}
} }
return new ValidationResult.Builder() return new ValidationResult.Builder()
.subject(subject) .subject(subject)

View File

@ -48,8 +48,9 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.kitesdk.data.DatasetException; import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException; import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.spi.DefaultConfiguration; import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.CSVFileReaderFixed; import org.kitesdk.data.spi.filesystem.CSVFileReader;
import org.kitesdk.data.spi.filesystem.CSVProperties; import org.kitesdk.data.spi.filesystem.CSVProperties;
import static org.apache.nifi.processor.util.StandardValidators.createLongValidator; import static org.apache.nifi.processor.util.StandardValidators.createLongValidator;
@ -90,6 +91,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
.description( .description(
"Outgoing Avro schema for each record created from a CSV row") "Outgoing Avro schema for each record created from a CSV row")
.addValidator(SCHEMA_VALIDATOR) .addValidator(SCHEMA_VALIDATOR)
.expressionLanguageSupported(true)
.required(true) .required(true)
.build(); .build();
@ -201,9 +203,17 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
return; return;
} }
final Schema schema = getSchema( String schemaProperty = context.getProperty(SCHEMA)
context.getProperty(SCHEMA).getValue(), .evaluateAttributeExpressions(flowFile)
DefaultConfiguration.get()); .getValue();
final Schema schema;
try {
schema = getSchema(schemaProperty, DefaultConfiguration.get());
} catch (SchemaNotFoundException e) {
getLogger().error("Cannot find schema: " + schemaProperty);
session.transfer(flowFile, FAILURE);
return;
}
final DataFileWriter<Record> writer = new DataFileWriter<>( final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class)); AvroUtil.newDatumWriter(schema, Record.class));
@ -215,7 +225,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
public void process(InputStream in, OutputStream out) throws IOException { public void process(InputStream in, OutputStream out) throws IOException {
long written = 0L; long written = 0L;
long errors = 0L; long errors = 0L;
try (CSVFileReaderFixed<Record> reader = new CSVFileReaderFixed<>( try (CSVFileReader<Record> reader = new CSVFileReader<>(
in, props, schema, Record.class)) { in, props, schema, Record.class)) {
reader.initialize(); reader.initialize();
try (DataFileWriter<Record> w = writer.create(schema, out)) { try (DataFileWriter<Record> w = writer.create(schema, out)) {
@ -247,11 +257,6 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
} catch (DatasetException e) { } catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e); getLogger().error("Failed to read FlowFile", e);
session.transfer(flowFile, FAILURE); session.transfer(flowFile, FAILURE);
} catch (Throwable t) {
getLogger().error("Unknown Throwable", t);
session.rollback(true); // penalize just in case
context.yield();
} }
} }
} }

View File

@ -42,7 +42,9 @@ import org.apache.nifi.processor.io.StreamCallback;
import org.kitesdk.data.DatasetException; import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetRecordException; import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.SchemaNotFoundException;
import org.kitesdk.data.spi.DefaultConfiguration; import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.filesystem.JSONFileReader;
@Tags({"kite", "json", "avro"}) @Tags({"kite", "json", "avro"})
@CapabilityDescription( @CapabilityDescription(
@ -66,6 +68,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
.description( .description(
"Outgoing Avro schema for each record created from a JSON object") "Outgoing Avro schema for each record created from a JSON object")
.addValidator(SCHEMA_VALIDATOR) .addValidator(SCHEMA_VALIDATOR)
.expressionLanguageSupported(true)
.required(true) .required(true)
.build(); .build();
@ -102,9 +105,17 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
return; return;
} }
final Schema schema = getSchema( String schemaProperty = context.getProperty(SCHEMA)
context.getProperty(SCHEMA).getValue(), .evaluateAttributeExpressions(flowFile)
DefaultConfiguration.get()); .getValue();
final Schema schema;
try {
schema = getSchema(schemaProperty, DefaultConfiguration.get());
} catch (SchemaNotFoundException e) {
getLogger().error("Cannot find schema: " + schemaProperty);
session.transfer(flowFile, FAILURE);
return;
}
final DataFileWriter<Record> writer = new DataFileWriter<>( final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class)); AvroUtil.newDatumWriter(schema, Record.class));

View File

@ -1,114 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.spi.AbstractDatasetReader;
import org.kitesdk.data.spi.DataModelUtil;
import org.kitesdk.data.spi.JsonUtil;
import org.kitesdk.data.spi.ReaderWriterState;
/**
* This is a temporary addition. The version in 0.18.0 throws a NPE when the
* InputStream constructor is used.
*/
class JSONFileReader<E> extends AbstractDatasetReader<E> {
private final GenericData model;
private final Schema schema;
private InputStream incoming = null;
// state
private ReaderWriterState state = ReaderWriterState.NEW;
private Iterator<E> iterator;
public JSONFileReader(InputStream incoming, Schema schema, Class<E> type) {
this.incoming = incoming;
this.schema = schema;
this.model = DataModelUtil.getDataModelForType(type);
this.state = ReaderWriterState.NEW;
}
@Override
public void initialize() {
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
"A reader may not be opened more than once - current state:%s", state);
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
"Schemas for JSON files should be record");
this.iterator = Iterators.transform(JsonUtil.parser(incoming),
new Function<JsonNode, E>() {
@Override
@SuppressWarnings("unchecked")
public E apply(@Nullable JsonNode node) {
return (E) JsonUtil.convertToAvro(model, node, schema);
}
});
this.state = ReaderWriterState.OPEN;
}
@Override
public boolean hasNext() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
return iterator.hasNext();
}
@Override
public E next() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
return iterator.next();
}
@Override
public void close() {
if (!state.equals(ReaderWriterState.OPEN)) {
return;
}
iterator = null;
try {
incoming.close();
} catch (IOException e) {
throw new DatasetIOException("Unable to close reader path", e);
}
state = ReaderWriterState.CLOSED;
}
@Override
public boolean isOpen() {
return (this.state == ReaderWriterState.OPEN);
}
}

View File

@ -47,7 +47,7 @@ import org.kitesdk.data.ValidationException;
import org.kitesdk.data.View; import org.kitesdk.data.View;
import org.kitesdk.data.spi.SchemaValidationUtil; import org.kitesdk.data.spi.SchemaValidationUtil;
@Tags({"kite", "avro", "parquet", "hive", "hdfs", "hbase"}) @Tags({"kite", "avro", "parquet", "hadoop", "hive", "hdfs", "hbase"})
@CapabilityDescription("Stores Avro records in a Kite dataset") @CapabilityDescription("Stores Avro records in a Kite dataset")
public class StoreInKiteDataset extends AbstractKiteProcessor { public class StoreInKiteDataset extends AbstractKiteProcessor {
private static final Relationship SUCCESS = new Relationship.Builder() private static final Relationship SUCCESS = new Relationship.Builder()
@ -151,11 +151,6 @@ public class StoreInKiteDataset extends AbstractKiteProcessor {
getLogger().error(e.getMessage()); getLogger().error(e.getMessage());
getLogger().debug("Incompatible schema error", e); getLogger().debug("Incompatible schema error", e);
session.transfer(flowFile, INCOMPATIBLE); session.transfer(flowFile, INCOMPATIBLE);
} catch (Throwable t) {
getLogger().error("Unknown Throwable", t);
session.rollback(true); // penalize just in case
context.yield();
} }
} }

View File

@ -1,172 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.kitesdk.data.spi.filesystem;
import au.com.bytecode.opencsv.CSVReader;
import java.io.InputStream;
import com.google.common.collect.Lists;
import java.util.List;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.spi.AbstractDatasetReader;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.EntityAccessor;
import org.kitesdk.data.spi.ReaderWriterState;
import com.google.common.base.Preconditions;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.kitesdk.data.spi.filesystem.CSVProperties;
import org.kitesdk.data.spi.filesystem.CSVUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.NoSuchElementException;
import static org.kitesdk.data.spi.filesystem.FileSystemProperties.REUSE_RECORDS;
/**
* This is a temporary addition. The version in 0.18.0 throws a NPE when the
* InputStream constructor is used.
*/
public class CSVFileReaderFixed<E> extends AbstractDatasetReader<E> {
private final CSVProperties props;
private final Schema schema;
private final boolean reuseRecords;
private final Class<E> recordClass;
private CSVReader reader = null;
private CSVRecordBuilder<E> builder;
private InputStream incoming = null;
// state
private ReaderWriterState state = ReaderWriterState.NEW;
private boolean hasNext = false;
private String[] next = null;
private E record = null;
public CSVFileReaderFixed(InputStream incoming, CSVProperties props,
Schema schema, Class<E> type) {
this.incoming = incoming;
this.schema = schema;
this.recordClass = type;
this.state = ReaderWriterState.NEW;
this.props = props;
this.reuseRecords = false;
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()),
"Schemas for CSV files must be records of primitive types");
}
@Override
@SuppressWarnings("unchecked")
public void initialize() {
Preconditions.checkState(state.equals(ReaderWriterState.NEW),
"A reader may not be opened more than once - current state:%s", state);
this.reader = CSVUtil.newReader(incoming, props);
List<String> header = null;
if (props.useHeader) {
this.hasNext = advance();
header = Lists.newArrayList(next);
} else if (props.header != null) {
try {
header = Lists.newArrayList(
CSVUtil.newParser(props).parseLine(props.header));
} catch (IOException e) {
throw new DatasetIOException(
"Failed to parse header from properties: " + props.header, e);
}
}
this.builder = new CSVRecordBuilder<E>(schema, recordClass, header);
// initialize by reading the first record
this.hasNext = advance();
this.state = ReaderWriterState.OPEN;
}
@Override
public boolean hasNext() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
return hasNext;
}
@Override
public E next() {
Preconditions.checkState(state.equals(ReaderWriterState.OPEN),
"Attempt to read from a file in state:%s", state);
if (!hasNext) {
throw new NoSuchElementException();
}
try {
if (reuseRecords) {
this.record = builder.makeRecord(next, record);
return record;
} else {
return builder.makeRecord(next, null);
}
} finally {
this.hasNext = advance();
}
}
private boolean advance() {
try {
next = reader.readNext();
} catch (IOException e) {
throw new DatasetIOException("Could not read record", e);
}
return (next != null);
}
@Override
public void close() {
if (!state.equals(ReaderWriterState.OPEN)) {
return;
}
try {
reader.close();
} catch (IOException e) {
throw new DatasetIOException("Unable to close reader", e);
}
state = ReaderWriterState.CLOSED;
}
@Override
public boolean isOpen() {
return (this.state == ReaderWriterState.OPEN);
}
}

View File

@ -66,6 +66,9 @@ public class TestGetSchema {
@Ignore("Does not work on windows") @Ignore("Does not work on windows")
public void testSchemaFromKiteURIs() throws IOException { public void testSchemaFromKiteURIs() throws IOException {
String location = temp.newFolder("ns", "temp").toString(); String location = temp.newFolder("ns", "temp").toString();
if (location.endsWith("/")) {
location = location.substring(0, location.length() - 1);
}
String datasetUri = "dataset:" + location; String datasetUri = "dataset:" + location;
DatasetDescriptor descriptor = new DatasetDescriptor.Builder() DatasetDescriptor descriptor = new DatasetDescriptor.Builder()
.schema(SCHEMA) .schema(SCHEMA)