diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index fd09961fe5..a8459a9cdc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -20,6 +20,7 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Collection; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; @@ -28,10 +29,13 @@ import java.util.Optional; import java.util.Set; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; @@ -48,6 +52,23 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement private static final Set requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); private static final int MAX_AVRO_SCHEMA_CACHE_SIZE = 20; + private enum CodecType { + BZIP2, + DEFLATE, + NONE, + SNAPPY, + LZO + } + + private static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder() + .name("compression-format") + .displayName("Compression Format") + .description("Compression type to use when writing Avro files. Default is None.") + .allowableValues(CodecType.values()) + .defaultValue(CodecType.NONE.toString()) + .required(true) + .build(); + private final Map compiledAvroSchemaCache = new LinkedHashMap() { @Override protected boolean removeEldestEntry(final Map.Entry eldest) { @@ -61,6 +82,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement @Override public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); + final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue(); try { final Schema avroSchema; @@ -80,7 +102,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement } if (AVRO_EMBEDDED.getValue().equals(strategyValue)) { - return new WriteAvroResultWithSchema(avroSchema, out); + return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat)); } else { return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out); } @@ -113,6 +135,30 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement } } + private CodecFactory getCodecFactory(String property) { + CodecType type = CodecType.valueOf(property); + switch (type) { + case BZIP2: + return CodecFactory.bzip2Codec(); + case DEFLATE: + return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL); + case LZO: + return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL); + case SNAPPY: + return CodecFactory.snappyCodec(); + case NONE: + default: + return CodecFactory.nullCodec(); + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(COMPRESSION_FORMAT); + return properties; + } + @Override protected List getSchemaWriteStrategyValues() { final List allowableValues = new ArrayList<>(); @@ -135,4 +181,22 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement return super.getRequiredSchemaFields(validationContext); } + + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(super.customValidate(validationContext)); + final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue(); + final String compressionFormatValue = validationContext.getProperty(COMPRESSION_FORMAT).getValue(); + if (!writeStrategyValue.equalsIgnoreCase(AVRO_EMBEDDED.getValue()) + && !CodecType.NONE.toString().equals(compressionFormatValue)) { + results.add(new ValidationResult.Builder() + .subject(COMPRESSION_FORMAT.getName()) + .valid(false) + .explanation("Avro compression codecs are stored in the header of the Avro file and therefore " + + "requires the header to be embedded into the content.") + .build()); + } + + return results; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java index ae2f109232..ea327a4b18 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithSchema.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.Map; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; @@ -34,12 +35,13 @@ public class WriteAvroResultWithSchema extends AbstractRecordSetWriter { private final DataFileWriter dataFileWriter; private final Schema schema; - public WriteAvroResultWithSchema(final Schema schema, final OutputStream out) throws IOException { + public WriteAvroResultWithSchema(final Schema schema, final OutputStream out, final CodecFactory codec) throws IOException { super(out); this.schema = schema; final GenericDatumWriter datumWriter = new GenericDatumWriter<>(schema); dataFileWriter = new DataFileWriter<>(datumWriter); + dataFileWriter.setCodec(codec); dataFileWriter.create(schema, out); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java index 9761076b6e..b3eecdeb33 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.OutputStream; import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.StringType; @@ -33,7 +34,7 @@ public class TestWriteAvroResultWithSchema extends TestWriteAvroResult { @Override protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException { - return new WriteAvroResultWithSchema(schema, out); + return new WriteAvroResultWithSchema(schema, out, CodecFactory.nullCodec()); } @Override