NIFI-1840 Added compression type property in Kite processors

This closes #409
This commit is contained in:
Pierre Villard 2016-05-03 17:00:49 +02:00 committed by Andre F de Miranda
parent 2c907c63af
commit 75d0c74d27
7 changed files with 219 additions and 12 deletions

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.kite;
import org.apache.avro.file.CodecFactory;
import org.apache.nifi.components.PropertyDescriptor;
import com.google.common.annotations.VisibleForTesting;
abstract class AbstractKiteConvertProcessor extends AbstractKiteProcessor {
@VisibleForTesting
static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("kite-compression-type")
.displayName("Compression type")
.description("Compression type to use when writting Avro files. Default is Snappy.")
.allowableValues(CodecType.values())
.defaultValue(CodecType.SNAPPY.toString())
.build();
public enum CodecType {
BZIP2,
DEFLATE,
NONE,
SNAPPY,
LZO
}
protected CodecFactory getCodecFactory(String property) {
CodecType type = CodecType.valueOf(property);
switch (type) {
case BZIP2:
return CodecFactory.bzip2Codec();
case DEFLATE:
return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL);
case NONE:
return CodecFactory.nullCodec();
case LZO:
return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL);
case SNAPPY:
default:
return CodecFactory.snappyCodec();
}
}
}

View File

@ -30,7 +30,6 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
@ -70,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong;
@DynamicProperty(name = "Field name from input schema",
value = "Field name for output schema",
description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
public class ConvertAvroSchema extends AbstractKiteProcessor {
public class ConvertAvroSchema extends AbstractKiteConvertProcessor {
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
@ -180,7 +179,9 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
.<PropertyDescriptor> builder()
.add(INPUT_SCHEMA)
.add(OUTPUT_SCHEMA)
.add(LOCALE).build();
.add(LOCALE)
.add(COMPRESSION_TYPE)
.build();
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
.<Relationship> builder().add(SUCCESS).add(FAILURE).build();
@ -284,11 +285,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(outputSchema, Record.class));
writer.setCodec(CodecFactory.snappyCodec());
writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
AvroUtil.newDatumWriter(outputSchema, Record.class));
failureWriter.setCodec(CodecFactory.snappyCodec());
failureWriter.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
try {
final AtomicLong written = new AtomicLong(0L);
@ -376,6 +377,17 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(incomingAvro, FAILURE);
} finally {
try {
writer.close();
} catch (IOException e) {
getLogger().warn("Unable to close writer ressource", e);
}
try {
failureWriter.close();
} catch (IOException e) {
getLogger().warn("Unable to close writer ressource", e);
}
}
}
}

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.lang3.StringEscapeUtils;
@ -63,7 +62,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Tags({"kite", "csv", "avro"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts CSV files to Avro according to an Avro Schema")
public class ConvertCSVToAvro extends AbstractKiteProcessor {
public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
@ -164,6 +163,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
.add(ESCAPE)
.add(HAS_HEADER)
.add(LINES_TO_SKIP)
.add(COMPRESSION_TYPE)
.build();
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
@ -221,7 +221,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
}
try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
writer.setCodec(CodecFactory.snappyCodec());
writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
try {
final AtomicLong written = new AtomicLong(0L);

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -54,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Tags({"kite", "json", "avro"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts JSON files to Avro according to an Avro Schema")
public class ConvertJSONToAvro extends AbstractKiteProcessor {
public class ConvertJSONToAvro extends AbstractKiteConvertProcessor {
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
@ -85,6 +84,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
= ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(SCHEMA)
.add(COMPRESSION_TYPE)
.build();
private static final Set<Relationship> RELATIONSHIPS
@ -129,7 +129,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class));
writer.setCodec(CodecFactory.snappyCodec());
writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
try {
final AtomicLong written = new AtomicLong(0L);
@ -200,6 +200,12 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(incomingJSON, FAILURE);
} finally {
try {
writer.close();
} catch (IOException e) {
getLogger().warn("Unable to close writer ressource", e);
}
}
}

View File

@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -89,7 +90,6 @@ public class TestCSVToAvroProcessor {
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
}
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
@ -118,6 +118,35 @@ public class TestCSVToAvroProcessor {
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
}
@Test
public void testBasicConversionWithCompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.DEFLATE.toString());
runner.assertValid();
runner.enqueue(streamFor(CSV_CONTENT));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 row", 1, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 0);
runner.assertTransferCount("incompatible", 1);
MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
String failureContent = new String(runner.getContentAsByteArray(incompatible),
StandardCharsets.UTF_8);
Assert.assertEquals("Should reject an invalid string and double",
CSV_CONTENT, failureContent);
Assert.assertEquals("Should accumulate error messages",
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
}
@Test
public void testAlternateCharset() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);

View File

@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.lang.LocaleUtils;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -124,6 +125,73 @@ public class TestConvertAvroSchema {
Assert.assertEquals(2, count);
}
@Test
public void testBasicConversionWithCompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
runner.assertNotValid();
runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, INPUT_SCHEMA.toString());
runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString());
runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.BZIP2.toString());
Locale locale = Locale.getDefault();
runner.setProperty("primaryColor", "color");
runner.assertValid();
NumberFormat format = NumberFormat.getInstance(locale);
// Two valid rows, and one invalid because "free" is not a double.
Record goodRecord1 = dataBasic("1", "blue", null, null);
Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
Record badRecord = dataBasic("3", "red", "yellow", "free");
List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
badRecord);
runner.enqueue(streamFor(input));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 1 rows", 1, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 1);
MockFlowFile incompatible = runner.getFlowFilesForRelationship(
"failure").get(0);
GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
INPUT_SCHEMA);
DataFileStream<Record> stream = new DataFileStream<Record>(
new ByteArrayInputStream(
runner.getContentAsByteArray(incompatible)), reader);
int count = 0;
for (Record r : stream) {
Assert.assertEquals(badRecord, r);
count++;
}
stream.close();
Assert.assertEquals(1, count);
Assert.assertEquals("Should accumulate error messages",
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
OUTPUT_SCHEMA);
DataFileStream<Record> successStream = new DataFileStream<Record>(
new ByteArrayInputStream(runner.getContentAsByteArray(runner
.getFlowFilesForRelationship("success").get(0))),
successReader);
count = 0;
for (Record r : successStream) {
if (count == 0) {
Assert.assertEquals(convertBasic(goodRecord1, locale), r);
} else {
Assert.assertEquals(convertBasic(goodRecord2, locale), r);
}
count++;
}
successStream.close();
Assert.assertEquals(2, count);
}
@Test
public void testBasicConversionWithLocales() throws IOException {
testBasicConversionWithLocale("en_US");

View File

@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -83,6 +84,35 @@ public class TestJSONToAvroProcessor {
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
}
@Test
public void testBasicConversionWithCompression() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
runner.assertNotValid();
runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.NONE.toString());
runner.assertValid();
runner.enqueue(streamFor(JSON_CONTENT));
runner.run();
long converted = runner.getCounterValue("Converted records");
long errors = runner.getCounterValue("Conversion errors");
Assert.assertEquals("Should convert 2 rows", 2, converted);
Assert.assertEquals("Should reject 3 rows", 3, errors);
runner.assertTransferCount("success", 1);
runner.assertTransferCount("failure", 0);
runner.assertTransferCount("incompatible", 1);
MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
String failureContent = new String(runner.getContentAsByteArray(incompatible),
StandardCharsets.UTF_8);
Assert.assertEquals("Should reject an invalid string and double",
JSON_CONTENT, failureContent);
Assert.assertEquals("Should accumulate error messages",
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
}
@Test
public void testOnlyErrors() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);