From a6f91a197516128a1809c7563efaaf3e5fe63a17 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Thu, 6 Dec 2018 09:50:20 +0100 Subject: [PATCH] NIFI-5872 - Added compression option to JsonRecordSetWriter This closes #3208. Signed-off-by: Koji Kawamura --- .../nifi-standard-processors/pom.xml | 2 + .../standard/TestConvertRecord.java | 61 ++++++++- .../TestConvertRecord/input/person.json | 7 ++ .../TestConvertRecord/schema/person.avsc | 17 +++ .../apache/nifi/json/JsonRecordSetWriter.java | 117 ++++++++++++++---- .../org/apache/nifi/json/WriteJsonResult.java | 12 +- 6 files changed, 191 insertions(+), 25 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 620c570c05..3ac6f191d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -519,6 +519,8 @@ src/test/resources/TestForkRecord/output/split-transactions.json src/test/resources/TestForkRecord/schema/extract-schema.avsc src/test/resources/TestForkRecord/schema/schema.avsc + src/test/resources/TestConvertRecord/schema/person.avsc + src/test/resources/TestConvertRecord/input/person.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java index 7d3f3166d6..eba08354c8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java @@ -17,7 +17,21 @@ package org.apache.nifi.processors.standard; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.RecordFieldType; @@ -25,8 +39,7 @@ import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; - -import static org.junit.Assert.assertTrue; +import org.xerial.snappy.SnappyInputStream; public class TestConvertRecord { @@ -161,4 +174,48 @@ public class TestConvertRecord { final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0); assertTrue(original == out); } + + @Test + public void testJSONCompression() throws InitializationException, IOException { + final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class); + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.setProperty(jsonWriter, "compression-format", "snappy"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + + runner.setProperty(ConvertRecord.RECORD_READER, "reader"); + runner.setProperty(ConvertRecord.RECORD_WRITER, "writer"); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + + MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) { + final byte[] buffer = new byte[8192]; int len; + while ((len = sis.read(buffer)) > 0) { + out.write(buffer, 0, len); + } + out.flush(); + } + + assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name())); + } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json new file mode 100644 index 0000000000..e153afedf6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json @@ -0,0 +1,7 @@ +[ { + "id" : 485, + "name" : { + "last" : "Doe", + "first" : "John" + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc new file mode 100644 index 0000000000..82713ea72d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc @@ -0,0 +1,17 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": { + "type": "record", + "name": "nameRecord", + "fields": [ + { "name": "last", "type": "string" }, + { "name": "first", "type": "string" } + ] + } + } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index e2b417d45b..b61586ed34 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -17,13 +17,15 @@ package org.apache.nifi.json; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.nifi.record.NullSuppression; +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -34,45 +36,58 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.record.NullSuppression; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.stream.io.GZIPOutputStream; +import org.tukaani.xz.LZMA2Options; +import org.tukaani.xz.XZOutputStream; +import org.xerial.snappy.SnappyFramedOutputStream; +import org.xerial.snappy.SnappyOutputStream; @Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"}) @CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet " - + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.") + + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.") public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress", - "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out"); + "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out"); static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress", - "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value"); + "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value"); static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values", - "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out."); + "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out."); static final AllowableValue OUTPUT_ARRAY = new AllowableValue("output-array", "Array", "Output records as a JSON array"); static final AllowableValue OUTPUT_ONELINE = new AllowableValue("output-oneline", "One Line Per Object", "Output records with one JSON object per line, delimited by a newline character"); + public static final String COMPRESSION_FORMAT_GZIP = "gzip"; + public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2"; + public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2"; + public static final String COMPRESSION_FORMAT_SNAPPY = "snappy"; + public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed"; + public static final String COMPRESSION_FORMAT_NONE = "none"; + static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder() - .name("suppress-nulls") - .displayName("Suppress Null Values") - .description("Specifies how the writer should handle a null field") - .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING) - .defaultValue(NEVER_SUPPRESS.getValue()) - .required(true) - .build(); + .name("suppress-nulls") + .displayName("Suppress Null Values") + .description("Specifies how the writer should handle a null field") + .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING) + .defaultValue(NEVER_SUPPRESS.getValue()) + .required(true) + .build(); static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder() - .name("Pretty Print JSON") - .description("Specifies whether or not the JSON should be pretty printed") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) - .allowableValues("true", "false") - .defaultValue("false") - .required(true) - .build(); + .name("Pretty Print JSON") + .description("Specifies whether or not the JSON should be pretty printed") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); static final PropertyDescriptor OUTPUT_GROUPING = new PropertyDescriptor.Builder() .name("output-grouping") .displayName("Output Grouping") @@ -82,10 +97,30 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements .defaultValue(OUTPUT_ARRAY.getValue()) .required(true) .build(); + public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() + .name("compression-format") + .displayName("Compression Format") + .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed") + .allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2, + COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED) + .defaultValue(COMPRESSION_FORMAT_NONE) + .required(true) + .build(); + public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder() + .name("compression-level") + .displayName("Compression Level") + .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing " + + "but less compression; a value of 0 indicates no compression but simply archiving") + .defaultValue("1") + .required(true) + .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") + .build(); private volatile boolean prettyPrint; private volatile NullSuppression nullSuppression; private volatile OutputGrouping outputGrouping; + private volatile String compressionFormat; + private volatile int compressionLevel; @Override protected List getSupportedPropertyDescriptors() { @@ -93,6 +128,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements properties.add(PRETTY_PRINT_JSON); properties.add(SUPPRESS_NULLS); properties.add(OUTPUT_GROUPING); + properties.add(COMPRESSION_FORMAT); + properties.add(COMPRESSION_LEVEL); return properties; } @@ -130,12 +167,50 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements grouping = OutputGrouping.OUTPUT_ARRAY; } this.outputGrouping = grouping; + + this.compressionFormat = context.getProperty(COMPRESSION_FORMAT).getValue(); + this.compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger(); } @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { - return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, outputGrouping, - getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null)); + + final OutputStream bufferedOut = new BufferedOutputStream(out, 65536); + final OutputStream compressionOut; + String mimeTypeRef; + + try { + switch (compressionFormat.toLowerCase()) { + case COMPRESSION_FORMAT_GZIP: + compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel); + mimeTypeRef = "application/gzip"; + break; + case COMPRESSION_FORMAT_XZ_LZMA2: + compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options()); + mimeTypeRef = "application/x-xz"; + break; + case COMPRESSION_FORMAT_SNAPPY: + compressionOut = new SnappyOutputStream(bufferedOut); + mimeTypeRef = "application/x-snappy"; + break; + case COMPRESSION_FORMAT_SNAPPY_FRAMED: + compressionOut = new SnappyFramedOutputStream(bufferedOut); + mimeTypeRef = "application/x-snappy-framed"; + break; + case COMPRESSION_FORMAT_BZIP2: + mimeTypeRef = "application/x-bzip2"; + compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut); + break; + default: + mimeTypeRef = "application/json"; + compressionOut = out; + } + } catch (CompressorException e) { + throw new IOException(e); + } + + return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping, + getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java index 5708f5e7e7..4ce6cebd35 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -60,9 +60,16 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe private final Supplier LAZY_DATE_FORMAT; private final Supplier LAZY_TIME_FORMAT; private final Supplier LAZY_TIMESTAMP_FORMAT; + private String mimeType = "application/json"; public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint, - final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { + final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { + this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json"); + } + + public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint, + final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat, + final String mimeType) throws IOException { super(out); this.logger = logger; @@ -70,6 +77,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe this.schemaAccess = schemaAccess; this.nullSuppression = nullSuppression; this.outputGrouping = outputGrouping; + this.mimeType = mimeType; final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); @@ -399,7 +407,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe @Override public String getMimeType() { - return "application/json"; + return this.mimeType; } private static interface GeneratorTask {