NIFI-5706 Added ConvertAvroToParquet processor

- Refactored code : ParquetBuilderProperties merged in ParquetUtils

This closes #3079.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mohit Garg 2018-10-16 11:40:14 +05:30 committed by Bryan Bende
parent 02261311b3
commit e45584d0fb
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
11 changed files with 871 additions and 161 deletions

View File

@ -92,6 +92,7 @@
<exclude>src/test/resources/avro/user.avsc</exclude>
<exclude>src/test/resources/avro/user-with-array.avsc</exclude>
<exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
<exclude>src/test/resources/avro/all-minus-enum.avsc</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet;
import com.google.common.collect.ImmutableSet;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
@Tags({"avro", "parquet", "convert"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does "
+ "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can "
+ "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.")
@WritesAttributes({
@WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"),
@WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.")
})
public class ConvertAvroToParquet extends AbstractProcessor {
// Attributes
public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
private volatile List<PropertyDescriptor> parquetProps;
// Relationships
static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
.description("Parquet file that was converted successfully from Avro")
.build();
static final Relationship FAILURE = new Relationship.Builder()
.name("failure")
.description("Avro content that could not be processed")
.build();
static final Set<Relationship> RELATIONSHIPS
= ImmutableSet.<Relationship>builder()
.add(SUCCESS)
.add(FAILURE)
.build();
@Override
protected final void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ParquetUtils.COMPRESSION_TYPE);
props.add(ParquetUtils.ROW_GROUP_SIZE);
props.add(ParquetUtils.PAGE_SIZE);
props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
props.add(ParquetUtils.MAX_PADDING_SIZE);
props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
props.add(ParquetUtils.ENABLE_VALIDATION);
props.add(ParquetUtils.WRITER_VERSION);
this.parquetProps = Collections.unmodifiableList(props);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return parquetProps;
}
@Override
public Set<Relationship> getRelationships() {
return RELATIONSHIPS;
}
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
long startTime = System.currentTimeMillis();
final AtomicInteger totalRecordCount = new AtomicInteger(0);
final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
FlowFile putFlowFile = flowFile;
putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
try (final InputStream in = new BufferedInputStream(rawIn);
final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, new GenericDatumReader<>())) {
Schema avroSchema = dataFileReader.getSchema();
getLogger().debug(avroSchema.toString(true));
ParquetWriter<GenericRecord> writer = createParquetWriter(context, flowFile, rawOut, avroSchema );
try {
int recordCount = 0;
GenericRecord record = null;
while (dataFileReader.hasNext()) {
record = dataFileReader.next();
writer.write(record);
recordCount++;
}
totalRecordCount.set(recordCount);
} finally {
writer.close();
}
}
});
// Add attributes and transfer to success
StringBuilder newFilename = new StringBuilder();
int extensionIndex = fileName.lastIndexOf(".");
if (extensionIndex != -1) {
newFilename.append(fileName.substring(0, extensionIndex));
} else {
newFilename.append(fileName);
}
newFilename.append(".parquet");
Map<String,String> outAttributes = new HashMap<>();
outAttributes.put(CoreAttributes.FILENAME.key(), newFilename.toString());
outAttributes.put(RECORD_COUNT_ATTRIBUTE,Integer.toString(totalRecordCount.get()) );
putFlowFile = session.putAllAttributes(putFlowFile, outAttributes);
session.transfer(putFlowFile, SUCCESS);
session.getProvenanceReporter().modifyContent(putFlowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
} catch (final ProcessException pe) {
getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe});
session.transfer(flowFile, FAILURE);
}
}
private ParquetWriter createParquetWriter(final ProcessContext context, final FlowFile flowFile, final OutputStream out, final Schema schema)
throws IOException {
NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out);
final AvroParquetWriter.Builder<GenericRecord> parquetWriter = AvroParquetWriter
.<GenericRecord>builder(nifiParquetOutputFile)
.withSchema(schema);
Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
conf.setBoolean("parquet.avro.add-list-element-records", false);
conf.setBoolean("parquet.avro.write-old-list-structure", false);
ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
return parquetWriter.build();
}
}

View File

@ -36,7 +36,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;
@SupportsBatching

View File

@ -32,24 +32,18 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter;
import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@ -77,64 +71,6 @@ import java.util.List;
})
public class PutParquet extends AbstractPutHDFSRecord {
public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
.name("row-group-size")
.displayName("Row Group Size")
.description("The row group size used by the Parquet writer. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
.name("page-size")
.displayName("Page Size")
.description("The page size used by the Parquet writer. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
.name("dictionary-page-size")
.displayName("Dictionary Page Size")
.description("The dictionary page size used by the Parquet writer. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
.name("max-padding-size")
.displayName("Max Padding Size")
.description("The maximum amount of padding that will be used to align row groups with blocks in the " +
"underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
.name("enable-dictionary-encoding")
.displayName("Enable Dictionary Encoding")
.description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
.name("enable-validation")
.displayName("Enable Validation")
.description("Specifies whether validation should be enabled for the Parquet writer")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
.name("writer-version")
.displayName("Writer Version")
.description("Specifies the version used by Parquet writer")
.allowableValues(ParquetProperties.WriterVersion.values())
.build();
public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder()
.name("remove-crc-files")
.displayName("Remove CRC Files")
@ -166,13 +102,13 @@ public class PutParquet extends AbstractPutHDFSRecord {
@Override
public List<PropertyDescriptor> getAdditionalProperties() {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ROW_GROUP_SIZE);
props.add(PAGE_SIZE);
props.add(DICTIONARY_PAGE_SIZE);
props.add(MAX_PADDING_SIZE);
props.add(ENABLE_DICTIONARY_ENCODING);
props.add(ENABLE_VALIDATION);
props.add(WRITER_VERSION);
props.add(ParquetUtils.ROW_GROUP_SIZE);
props.add(ParquetUtils.PAGE_SIZE);
props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
props.add(ParquetUtils.MAX_PADDING_SIZE);
props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
props.add(ParquetUtils.ENABLE_VALIDATION);
props.add(ParquetUtils.WRITER_VERSION);
props.add(REMOVE_CRC_FILES);
return Collections.unmodifiableList(props);
}
@ -187,88 +123,11 @@ public class PutParquet extends AbstractPutHDFSRecord {
.<GenericRecord>builder(path)
.withSchema(avroSchema);
applyCommonConfig(parquetWriter, context, flowFile, conf);
ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
}
private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
builder.withConf(conf);
// Required properties
final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
builder.withWriteMode(mode);
final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName());
final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
builder.withCompressionCodec(codecName);
// Optional properties
if (context.getProperty(ROW_GROUP_SIZE).isSet()){
try {
final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (rowGroupSize != null) {
builder.withRowGroupSize(rowGroupSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(PAGE_SIZE).isSet()) {
try {
final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (pageSize != null) {
builder.withPageSize(pageSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
try {
final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (dictionaryPageSize != null) {
builder.withDictionaryPageSize(dictionaryPageSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
try {
final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (maxPaddingSize != null) {
builder.withMaxPaddingSize(maxPaddingSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
builder.withDictionaryEncoding(enableDictionaryEncoding);
}
if (context.getProperty(ENABLE_VALIDATION).isSet()) {
final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
builder.withValidation(enableValidation);
}
if (context.getProperty(WRITER_VERSION).isSet()) {
final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
}
}
@Override
protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean();

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet.stream;
import org.apache.parquet.io.PositionOutputStream;
import java.io.IOException;
import java.io.OutputStream;
public class NifiOutputStream extends PositionOutputStream {
private long position = 0;
private OutputStream outputStream;
public NifiOutputStream(OutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public long getPos() throws IOException {
return position;
}
@Override
public void write(int b) throws IOException {
position++;
outputStream.write(b);
}
@Override
public void write(byte[] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
outputStream.write(b, off, len);
position += len;
}
@Override
public void flush() throws IOException {
outputStream.flush();
}
@Override
public void close() throws IOException {
outputStream.close();
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet.stream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import java.io.OutputStream;
public class NifiParquetOutputFile implements OutputFile {
private OutputStream outputStream;
public NifiParquetOutputFile(OutputStream outputStream) {
this.outputStream = outputStream;
}
@Override
public PositionOutputStream create(long blockSizeHint) {
return new NifiOutputStream(outputStream);
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) {
return new NifiOutputStream(outputStream);
}
@Override
public boolean supportsBlockSize() {
return false;
}
@Override
public long defaultBlockSize() {
return 0;
}
}

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.parquet.PutParquet;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ParquetUtils {
public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
.name("row-group-size")
.displayName("Row Group Size")
.description("The row group size used by the Parquet writer. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
.name("page-size")
.displayName("Page Size")
.description("The page size used by the Parquet writer. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
.name("dictionary-page-size")
.displayName("Dictionary Page Size")
.description("The dictionary page size used by the Parquet writer. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
.name("max-padding-size")
.displayName("Max Padding Size")
.description("The maximum amount of padding that will be used to align row groups with blocks in the " +
"underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
"The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
.name("enable-dictionary-encoding")
.displayName("Enable Dictionary Encoding")
.description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
.name("enable-validation")
.displayName("Enable Validation")
.description("Specifies whether validation should be enabled for the Parquet writer")
.allowableValues("true", "false")
.build();
public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
.name("writer-version")
.displayName("Writer Version")
.description("Specifies the version used by Parquet writer")
.allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values())
.build();
public static List<AllowableValue> COMPRESSION_TYPES = getCompressionTypes();
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("compression-type")
.displayName("Compression Type")
.description("The type of compression for the file being written.")
.allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0]))
.defaultValue(COMPRESSION_TYPES.get(0).getValue())
.required(true)
.build();
public static List<AllowableValue> getCompressionTypes() {
final List<AllowableValue> compressionTypes = new ArrayList<>();
for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
final String name = compressionCodecName.name();
compressionTypes.add(new AllowableValue(name, name));
}
return Collections.unmodifiableList(compressionTypes);
}
public static void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder,
final ProcessContext context,
final FlowFile flowFile,
final Configuration conf,
final AbstractProcessor abstractProcessor) {
builder.withConf(conf);
// Required properties
boolean overwrite = true;
if(context.getProperty(PutParquet.OVERWRITE).isSet())
overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean();
final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
builder.withWriteMode(mode);
final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName());
final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
builder.withCompressionCodec(codecName);
// Optional properties
if (context.getProperty(ROW_GROUP_SIZE).isSet()){
try {
final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (rowGroupSize != null) {
builder.withRowGroupSize(rowGroupSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(PAGE_SIZE).isSet()) {
try {
final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (pageSize != null) {
builder.withPageSize(pageSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
try {
final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (dictionaryPageSize != null) {
builder.withDictionaryPageSize(dictionaryPageSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
try {
final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
if (maxPaddingSize != null) {
builder.withMaxPaddingSize(maxPaddingSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
}
}
if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
builder.withDictionaryEncoding(enableDictionaryEncoding);
}
if (context.getProperty(ENABLE_VALIDATION).isSet()) {
final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
builder.withValidation(enableValidation);
}
if (context.getProperty(WRITER_VERSION).isSet()) {
final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
}
}
}

View File

@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.parquet.PutParquet
org.apache.nifi.processors.parquet.FetchParquet
org.apache.nifi.processors.parquet.FetchParquet
org.apache.nifi.processors.parquet.ConvertAvroToParquet

View File

@ -42,6 +42,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@ -184,7 +185,7 @@ public class PutParquetTest {
@Test
public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException {
configure(proc, 100);
testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
testRunner.setProperty(ParquetUtils.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis();
@ -472,7 +473,7 @@ public class PutParquetTest {
@Test
public void testRowGroupSize() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B");
testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "1024 B");
final String filename = "testRowGroupSize-" + System.currentTimeMillis();
@ -487,7 +488,7 @@ public class PutParquetTest {
@Test
public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}");
testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "${row.group.size}");
final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@ -503,7 +504,7 @@ public class PutParquetTest {
@Test
public void testPageSize() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B");
testRunner.setProperty(ParquetUtils.PAGE_SIZE, "1024 B");
final String filename = "testPageGroupSize-" + System.currentTimeMillis();
@ -518,7 +519,7 @@ public class PutParquetTest {
@Test
public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}");
testRunner.setProperty(ParquetUtils.PAGE_SIZE, "${page.size}");
final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@ -534,7 +535,7 @@ public class PutParquetTest {
@Test
public void testDictionaryPageSize() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B");
testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "1024 B");
final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis();
@ -549,7 +550,7 @@ public class PutParquetTest {
@Test
public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@ -565,7 +566,7 @@ public class PutParquetTest {
@Test
public void testMaxPaddingPageSize() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B");
testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "1024 B");
final String filename = "testMaxPaddingSize-" + System.currentTimeMillis();
@ -580,7 +581,7 @@ public class PutParquetTest {
@Test
public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}");
testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "${max.padding.size}");
final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis();

View File

@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.parquet;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for ConvertAvroToParquet processor
*/
public class TestConvertAvroToParquet {
private ConvertAvroToParquet processor;
private TestRunner runner;
private List<GenericRecord> records = new ArrayList<>();
File tmpAvro = new File("target/test.avro");
File tmpParquet = new File("target/test.parquet");
@Before
public void setUp() throws Exception {
processor = new ConvertAvroToParquet();
runner = TestRunners.newTestRunner(processor);
Schema schema = new Schema.Parser().parse(Resources.getResource("avro/all-minus-enum.avsc").openStream());
DataFileWriter<Object> awriter = new DataFileWriter<Object>(new GenericDatumWriter<Object>());
GenericData.Record nestedRecord = new GenericRecordBuilder(
schema.getField("mynestedrecord").schema())
.set("mynestedint", 1).build();
GenericData.Record record = new GenericRecordBuilder(schema)
.set("mynull", null)
.set("myboolean", true)
.set("myint", 1)
.set("mylong", 2L)
.set("myfloat", 3.1f)
.set("mydouble", 4.1)
.set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
.set("mystring", "hello")
.set("mynestedrecord", nestedRecord)
.set("myarray", new GenericData.Array<Integer>(Schema.createArray(Schema.create(Schema.Type.INT)), Arrays.asList(1, 2)))
.set("mymap", ImmutableMap.of("a", 1, "b", 2))
.set("myfixed", new GenericData.Fixed(Schema.createFixed("ignored", null, null, 1), new byte[] { (byte) 65 }))
.build();
awriter.create(schema, tmpAvro);
awriter.append(record);
awriter.flush();
awriter.close();
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(tmpAvro, datumReader);
GenericRecord record1 = null;
while (dataFileReader.hasNext()) {
record1 = dataFileReader.next(record1);
records.add(record1);
}
}
@Test
public void test_Processor() throws Exception {
FileInputStream fileInputStream = new FileInputStream(tmpAvro);
ByteArrayOutputStream out = new ByteArrayOutputStream();
int readedBytes;
byte[] buf = new byte[1024];
while ((readedBytes = fileInputStream.read(buf)) > 0) {
out.write(buf, 0, readedBytes);
}
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertAvroToParquet.SUCCESS, 1);
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
// assert meta data
assertEquals("1", resultFlowFile.getAttribute(ConvertAvroToParquet.RECORD_COUNT_ATTRIBUTE));
assertEquals("test.parquet", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
}
@Test
public void test_Meta_Info() throws Exception {
FileInputStream fileInputStream = new FileInputStream(tmpAvro);
ByteArrayOutputStream out = new ByteArrayOutputStream();
int readedBytes;
byte[] buf = new byte[1024];
while ((readedBytes = fileInputStream.read(buf)) > 0) {
out.write(buf, 0, readedBytes);
}
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
// Save the flowfile
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream(tmpParquet);
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
ParquetMetadata metaData;
metaData = ParquetFileReader.readFooter(conf, new Path(tmpParquet.getAbsolutePath()), NO_FILTER);
// #number of records
long nParquetRecords = 0;
for(BlockMetaData meta : metaData.getBlocks()){
nParquetRecords += meta.getRowCount();
}
long nAvroRecord = records.size();
assertEquals(nParquetRecords, nAvroRecord);
}
@Test
public void test_Data() throws Exception {
FileInputStream fileInputStream = new FileInputStream(tmpAvro);
ByteArrayOutputStream out = new ByteArrayOutputStream();
int readedBytes;
byte[] buf = new byte[1024];
while ((readedBytes = fileInputStream.read(buf)) > 0) {
out.write(buf, 0, readedBytes);
}
out.close();
Map<String, String> attributes = new HashMap<String, String>() {{
put(CoreAttributes.FILENAME.key(), "test.avro");
}};
runner.enqueue(out.toByteArray(), attributes);
runner.run();
MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
// Save the flowfile
byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
FileOutputStream fos = new FileOutputStream(tmpParquet);
fos.write(resultContents);
fos.flush();
fos.close();
Configuration conf = new Configuration();
FileSystem fs = FileSystem.getLocal(conf);
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(tmpParquet.getAbsolutePath()))
.withConf(conf)
.build();
List<Group> parquetRecords = new ArrayList<Group>();
Group current;
current = reader.read();
while (current != null) {
assertTrue(current instanceof Group);
parquetRecords.add(current);
current = reader.read();
}
Group firstRecord = parquetRecords.get(0);
// Primitive
assertEquals(firstRecord.getInteger("myint", 0), 1);
assertEquals(firstRecord.getLong("mylong", 0), 2);
assertEquals(firstRecord.getBoolean("myboolean", 0), true);
assertEquals(firstRecord.getFloat("myfloat", 0), 3.1, 0.0001);
assertEquals(firstRecord.getDouble("mydouble", 0), 4.1, 0.001);
assertEquals(firstRecord.getString("mybytes", 0), "hello");
assertEquals(firstRecord.getString("mystring", 0), "hello");
// Nested
assertEquals(firstRecord.getGroup("mynestedrecord",0).getInteger("mynestedint",0), 1);
// Array
assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",0).getInteger("element", 0), 1);
assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element", 0), 2);
// Map
assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",0).getInteger("value", 0), 1);
assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",1).getInteger("value", 0), 2);
// Fixed
assertEquals(firstRecord.getString("myfixed",0), "A");
}
@After
public void cleanup(){
tmpAvro.delete();
tmpParquet.delete();
}
}

View File

@ -0,0 +1,60 @@
{
"name": "myrecord",
"namespace": "parquet.avro",
"type": "record",
"fields": [{
"name": "mynull",
"type": "null"
}, {
"name": "myboolean",
"type": "boolean"
}, {
"name": "myint",
"type": "int"
}, {
"name": "mylong",
"type": "long"
}, {
"name": "myfloat",
"type": "float"
}, {
"name": "mydouble",
"type": "double"
}, {
"name": "mybytes",
"type": "bytes"
}, {
"name": "mystring",
"type": "string"
}, {
"name": "mynestedrecord",
"type": {
"type": "record",
"name": "ignored1",
"fields": [{
"name": "mynestedint",
"type": "int"
}]
}
}, {
"name": "myarray",
"type": {
"type": "array",
"items": "int"
}
}, {
"name": "mymap",
"type": {
"type": "map",
"values": "int"
}
}, {
"name": "myfixed",
"type": {
"type": "fixed",
"name": "ignored3",
"namespace": "",
"size": 1
}
}]
}