From 75d0c74d273600629d7a6e7027196c39b66513bb Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 3 May 2016 17:00:49 +0200 Subject: [PATCH] NIFI-1840 Added compression type property in Kite processors This closes #409 --- .../kite/AbstractKiteConvertProcessor.java | 62 +++++++++++++++++ .../processors/kite/ConvertAvroSchema.java | 22 ++++-- .../processors/kite/ConvertCSVToAvro.java | 6 +- .../processors/kite/ConvertJSONToAvro.java | 12 +++- .../kite/TestCSVToAvroProcessor.java | 31 ++++++++- .../kite/TestConvertAvroSchema.java | 68 +++++++++++++++++++ .../kite/TestJSONToAvroProcessor.java | 30 ++++++++ 7 files changed, 219 insertions(+), 12 deletions(-) create mode 100644 nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java new file mode 100644 index 0000000000..561bf46d38 --- /dev/null +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import org.apache.avro.file.CodecFactory; +import org.apache.nifi.components.PropertyDescriptor; + +import com.google.common.annotations.VisibleForTesting; + +abstract class AbstractKiteConvertProcessor extends AbstractKiteProcessor { + + @VisibleForTesting + static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name("kite-compression-type") + .displayName("Compression type") + .description("Compression type to use when writting Avro files. Default is Snappy.") + .allowableValues(CodecType.values()) + .defaultValue(CodecType.SNAPPY.toString()) + .build(); + + public enum CodecType { + BZIP2, + DEFLATE, + NONE, + SNAPPY, + LZO + } + + protected CodecFactory getCodecFactory(String property) { + CodecType type = CodecType.valueOf(property); + switch (type) { + case BZIP2: + return CodecFactory.bzip2Codec(); + case DEFLATE: + return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL); + case NONE: + return CodecFactory.nullCodec(); + case LZO: + return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL); + case SNAPPY: + default: + return CodecFactory.snappyCodec(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java index a8244d2129..a3fffc34ff 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java @@ -30,7 +30,6 @@ import java.util.Set; import java.util.regex.Pattern; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; @@ -70,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong; @DynamicProperty(name = "Field name from input schema", value = "Field name for output schema", description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id") -public class ConvertAvroSchema extends AbstractKiteProcessor { +public class ConvertAvroSchema extends AbstractKiteConvertProcessor { private static final Relationship SUCCESS = new Relationship.Builder() .name("success") @@ -180,7 +179,9 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { . builder() .add(INPUT_SCHEMA) .add(OUTPUT_SCHEMA) - .add(LOCALE).build(); + .add(LOCALE) + .add(COMPRESSION_TYPE) + .build(); private static final Set RELATIONSHIPS = ImmutableSet . builder().add(SUCCESS).add(FAILURE).build(); @@ -284,11 +285,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { final DataFileWriter writer = new DataFileWriter<>( AvroUtil.newDatumWriter(outputSchema, Record.class)); - writer.setCodec(CodecFactory.snappyCodec()); + writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); final DataFileWriter failureWriter = new DataFileWriter<>( AvroUtil.newDatumWriter(outputSchema, Record.class)); - failureWriter.setCodec(CodecFactory.snappyCodec()); + failureWriter.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); try { final AtomicLong written = new AtomicLong(0L); @@ -376,6 +377,17 @@ public class ConvertAvroSchema extends AbstractKiteProcessor { } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); session.transfer(incomingAvro, FAILURE); + } finally { + try { + writer.close(); + } catch (IOException e) { + getLogger().warn("Unable to close writer ressource", e); + } + try { + failureWriter.close(); + } catch (IOException e) { + getLogger().warn("Unable to close writer ressource", e); + } } } } diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index de4130ff67..22244eedcc 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Set; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; import org.apache.commons.lang3.StringEscapeUtils; @@ -63,7 +62,7 @@ import java.util.concurrent.atomic.AtomicLong; @Tags({"kite", "csv", "avro"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts CSV files to Avro according to an Avro Schema") -public class ConvertCSVToAvro extends AbstractKiteProcessor { +public class ConvertCSVToAvro extends AbstractKiteConvertProcessor { private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build(); @@ -164,6 +163,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { .add(ESCAPE) .add(HAS_HEADER) .add(LINES_TO_SKIP) + .add(COMPRESSION_TYPE) .build(); private static final Set RELATIONSHIPS = ImmutableSet. builder() @@ -221,7 +221,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { } try (final DataFileWriter writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) { - writer.setCodec(CodecFactory.snappyCodec()); + writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); try { final AtomicLong written = new AtomicLong(0L); diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 6245362d06..1127a2de54 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Set; import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -54,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; @Tags({"kite", "json", "avro"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts JSON files to Avro according to an Avro Schema") -public class ConvertJSONToAvro extends AbstractKiteProcessor { +public class ConvertJSONToAvro extends AbstractKiteConvertProcessor { private static final Relationship SUCCESS = new Relationship.Builder() .name("success") @@ -85,6 +84,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { = ImmutableList.builder() .addAll(AbstractKiteProcessor.getProperties()) .add(SCHEMA) + .add(COMPRESSION_TYPE) .build(); private static final Set RELATIONSHIPS @@ -129,7 +129,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { final DataFileWriter writer = new DataFileWriter<>( AvroUtil.newDatumWriter(schema, Record.class)); - writer.setCodec(CodecFactory.snappyCodec()); + writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue())); try { final AtomicLong written = new AtomicLong(0L); @@ -200,6 +200,12 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); session.transfer(incomingJSON, FAILURE); + } finally { + try { + writer.close(); + } catch (IOException e) { + getLogger().warn("Unable to close writer ressource", e); + } } } diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java index 902ec79b20..9252e81648 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -89,7 +90,6 @@ public class TestCSVToAvroProcessor { FAILURE_SUMMARY, incompatible.getAttribute("errors")); } - @Test public void testBasicConversion() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); @@ -118,6 +118,35 @@ public class TestCSVToAvroProcessor { FAILURE_SUMMARY, incompatible.getAttribute("errors")); } + @Test + public void testBasicConversionWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.DEFLATE.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(CSV_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + String failureContent = new String(runner.getContentAsByteArray(incompatible), + StandardCharsets.UTF_8); + Assert.assertEquals("Should reject an invalid string and double", + CSV_CONTENT, failureContent); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + @Test public void testAlternateCharset() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java index 2da0513964..7a62ac5009 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java @@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; import org.apache.commons.lang.LocaleUtils; +import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -124,6 +125,73 @@ public class TestConvertAvroSchema { Assert.assertEquals(2, count); } + @Test + public void testBasicConversionWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class); + runner.assertNotValid(); + runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, INPUT_SCHEMA.toString()); + runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString()); + runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.BZIP2.toString()); + Locale locale = Locale.getDefault(); + runner.setProperty("primaryColor", "color"); + runner.assertValid(); + + NumberFormat format = NumberFormat.getInstance(locale); + + // Two valid rows, and one invalid because "free" is not a double. + Record goodRecord1 = dataBasic("1", "blue", null, null); + Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5)); + Record badRecord = dataBasic("3", "red", "yellow", "free"); + List input = Lists.newArrayList(goodRecord1, goodRecord2, + badRecord); + + runner.enqueue(streamFor(input)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 1 rows", 1, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship( + "failure").get(0); + GenericDatumReader reader = new GenericDatumReader( + INPUT_SCHEMA); + DataFileStream stream = new DataFileStream( + new ByteArrayInputStream( + runner.getContentAsByteArray(incompatible)), reader); + int count = 0; + for (Record r : stream) { + Assert.assertEquals(badRecord, r); + count++; + } + stream.close(); + Assert.assertEquals(1, count); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + + GenericDatumReader successReader = new GenericDatumReader( + OUTPUT_SCHEMA); + DataFileStream successStream = new DataFileStream( + new ByteArrayInputStream(runner.getContentAsByteArray(runner + .getFlowFilesForRelationship("success").get(0))), + successReader); + count = 0; + for (Record r : successStream) { + if (count == 0) { + Assert.assertEquals(convertBasic(goodRecord1, locale), r); + } else { + Assert.assertEquals(convertBasic(goodRecord2, locale), r); + } + count++; + } + successStream.close(); + Assert.assertEquals(2, count); + } + @Test public void testBasicConversionWithLocales() throws IOException { testBasicConversionWithLocale("en_US"); diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java index e0b4a6fae8..776e2f3507 100644 --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -83,6 +84,35 @@ public class TestJSONToAvroProcessor { FAILURE_SUMMARY, incompatible.getAttribute("errors")); } + @Test + public void testBasicConversionWithCompression() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.NONE.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(JSON_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 3 rows", 3, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + String failureContent = new String(runner.getContentAsByteArray(incompatible), + StandardCharsets.UTF_8); + Assert.assertEquals("Should reject an invalid string and double", + JSON_CONTENT, failureContent); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + @Test public void testOnlyErrors() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);