NIFI-5872 - Added compression option to JsonRecordSetWriter

This closes #3208.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Pierre Villard 2018-12-06 09:50:20 +01:00 committed by Koji Kawamura
parent 60064a9f68
commit a6f91a1975
6 changed files with 191 additions and 25 deletions

View File

@ -519,6 +519,8 @@
<exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>
<exclude>src/test/resources/TestForkRecord/schema/extract-schema.avsc</exclude>
<exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
<exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -17,7 +17,21 @@
package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
@ -25,8 +39,7 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import org.xerial.snappy.SnappyInputStream;
public class TestConvertRecord {
@ -161,4 +174,48 @@ public class TestConvertRecord {
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
assertTrue(original == out);
}
@Test
public void testJSONCompression() throws InitializationException, IOException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.setProperty(jsonWriter, "compression-format", "snappy");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.setProperty(ConvertRecord.RECORD_READER, "reader");
runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) {
final byte[] buffer = new byte[8192]; int len;
while ((len = sis.read(buffer)) > 0) {
out.write(buffer, 0, len);
}
out.flush();
}
assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name()));
}
}

View File

@ -0,0 +1,7 @@
[ {
"id" : 485,
"name" : {
"last" : "Doe",
"first" : "John"
}
} ]

View File

@ -0,0 +1,17 @@
{
"name": "personWithNameRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": {
"type": "record",
"name": "nameRecord",
"fields": [
{ "name": "last", "type": "string" },
{ "name": "first", "type": "string" }
]
}
}
]
}

View File

@ -17,13 +17,15 @@
package org.apache.nifi.json;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.nifi.record.NullSuppression;
import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -34,45 +36,58 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.record.NullSuppression;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.tukaani.xz.LZMA2Options;
import org.tukaani.xz.XZOutputStream;
import org.xerial.snappy.SnappyFramedOutputStream;
import org.xerial.snappy.SnappyOutputStream;
@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
@CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
+ "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
+ "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress",
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
"Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values",
"When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
"When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
static final AllowableValue OUTPUT_ARRAY = new AllowableValue("output-array", "Array",
"Output records as a JSON array");
static final AllowableValue OUTPUT_ONELINE = new AllowableValue("output-oneline", "One Line Per Object",
"Output records with one JSON object per line, delimited by a newline character");
public static final String COMPRESSION_FORMAT_GZIP = "gzip";
public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
public static final String COMPRESSION_FORMAT_NONE = "none";
static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
.name("suppress-nulls")
.displayName("Suppress Null Values")
.description("Specifies how the writer should handle a null field")
.allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
.defaultValue(NEVER_SUPPRESS.getValue())
.required(true)
.build();
.name("suppress-nulls")
.displayName("Suppress Null Values")
.description("Specifies how the writer should handle a null field")
.allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
.defaultValue(NEVER_SUPPRESS.getValue())
.required(true)
.build();
static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder()
.name("Pretty Print JSON")
.description("Specifies whether or not the JSON should be pretty printed")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
.name("Pretty Print JSON")
.description("Specifies whether or not the JSON should be pretty printed")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("false")
.required(true)
.build();
static final PropertyDescriptor OUTPUT_GROUPING = new PropertyDescriptor.Builder()
.name("output-grouping")
.displayName("Output Grouping")
@ -82,10 +97,30 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
.defaultValue(OUTPUT_ARRAY.getValue())
.required(true)
.build();
public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
.name("compression-format")
.displayName("Compression Format")
.description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
.allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2,
COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
.defaultValue(COMPRESSION_FORMAT_NONE)
.required(true)
.build();
public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
.name("compression-level")
.displayName("Compression Level")
.description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing "
+ "but less compression; a value of 0 indicates no compression but simply archiving")
.defaultValue("1")
.required(true)
.allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
.build();
private volatile boolean prettyPrint;
private volatile NullSuppression nullSuppression;
private volatile OutputGrouping outputGrouping;
private volatile String compressionFormat;
private volatile int compressionLevel;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -93,6 +128,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
properties.add(PRETTY_PRINT_JSON);
properties.add(SUPPRESS_NULLS);
properties.add(OUTPUT_GROUPING);
properties.add(COMPRESSION_FORMAT);
properties.add(COMPRESSION_LEVEL);
return properties;
}
@ -130,12 +167,50 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
grouping = OutputGrouping.OUTPUT_ARRAY;
}
this.outputGrouping = grouping;
this.compressionFormat = context.getProperty(COMPRESSION_FORMAT).getValue();
this.compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, outputGrouping,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
final OutputStream bufferedOut = new BufferedOutputStream(out, 65536);
final OutputStream compressionOut;
String mimeTypeRef;
try {
switch (compressionFormat.toLowerCase()) {
case COMPRESSION_FORMAT_GZIP:
compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
mimeTypeRef = "application/gzip";
break;
case COMPRESSION_FORMAT_XZ_LZMA2:
compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options());
mimeTypeRef = "application/x-xz";
break;
case COMPRESSION_FORMAT_SNAPPY:
compressionOut = new SnappyOutputStream(bufferedOut);
mimeTypeRef = "application/x-snappy";
break;
case COMPRESSION_FORMAT_SNAPPY_FRAMED:
compressionOut = new SnappyFramedOutputStream(bufferedOut);
mimeTypeRef = "application/x-snappy-framed";
break;
case COMPRESSION_FORMAT_BZIP2:
mimeTypeRef = "application/x-bzip2";
compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
break;
default:
mimeTypeRef = "application/json";
compressionOut = out;
}
} catch (CompressorException e) {
throw new IOException(e);
}
return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
}
}

View File

@ -60,9 +60,16 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
private String mimeType = "application/json";
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json");
}
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
final String mimeType) throws IOException {
super(out);
this.logger = logger;
@ -70,6 +77,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
this.schemaAccess = schemaAccess;
this.nullSuppression = nullSuppression;
this.outputGrouping = outputGrouping;
this.mimeType = mimeType;
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
@ -399,7 +407,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
@Override
public String getMimeType() {
return "application/json";
return this.mimeType;
}
private static interface GeneratorTask {