From e45584d0fb8e19f6913fb25a8db3ce8303b73cd0 Mon Sep 17 00:00:00 2001 From: Mohit Garg Date: Tue, 16 Oct 2018 11:40:14 +0530 Subject: [PATCH] NIFI-5706 Added ConvertAvroToParquet processor - Refactored code : ParquetBuilderProperties merged in ParquetUtils This closes #3079. Signed-off-by: Bryan Bende --- .../nifi-parquet-processors/pom.xml | 1 + .../parquet/ConvertAvroToParquet.java | 204 ++++++++++++++ .../nifi/processors/parquet/FetchParquet.java | 1 - .../nifi/processors/parquet/PutParquet.java | 159 +---------- .../parquet/stream/NifiOutputStream.java | 66 +++++ .../parquet/stream/NifiParquetOutputFile.java | 54 ++++ .../parquet/utils/ParquetUtils.java | 202 ++++++++++++++ .../org.apache.nifi.processor.Processor | 3 +- .../processors/parquet/PutParquetTest.java | 19 +- .../parquet/TestConvertAvroToParquet.java | 263 ++++++++++++++++++ .../test/resources/avro/all-minus-enum.avsc | 60 ++++ 11 files changed, 871 insertions(+), 161 deletions(-) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml index c8aa0edb06..d458d27ce0 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml @@ -92,6 +92,7 @@ src/test/resources/avro/user.avsc src/test/resources/avro/user-with-array.avsc src/test/resources/avro/user-with-nullable-array.avsc + src/test/resources/avro/all-minus-enum.avsc diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java new file mode 100644 index 0000000000..0bcf606f22 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.parquet; + + +import com.google.common.collect.ImmutableSet; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile; +import org.apache.nifi.processors.parquet.utils.ParquetUtils; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetWriter; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.BufferedInputStream; +import java.io.IOException; +import java.util.Map; +import java.util.HashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Set; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +@Tags({"avro", "parquet", "convert"}) +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does " + + "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can " + + "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.") +@WritesAttributes({ + @WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"), + @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.") +}) +public class ConvertAvroToParquet extends AbstractProcessor { + + // Attributes + public static final String RECORD_COUNT_ATTRIBUTE = "record.count"; + + private volatile List parquetProps; + + // Relationships + static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Parquet file that was converted successfully from Avro") + .build(); + + static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description("Avro content that could not be processed") + .build(); + + static final Set RELATIONSHIPS + = ImmutableSet.builder() + .add(SUCCESS) + .add(FAILURE) + .build(); + + @Override + protected final void init(final ProcessorInitializationContext context) { + + + final List props = new ArrayList<>(); + + props.add(ParquetUtils.COMPRESSION_TYPE); + props.add(ParquetUtils.ROW_GROUP_SIZE); + props.add(ParquetUtils.PAGE_SIZE); + props.add(ParquetUtils.DICTIONARY_PAGE_SIZE); + props.add(ParquetUtils.MAX_PADDING_SIZE); + props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING); + props.add(ParquetUtils.ENABLE_VALIDATION); + props.add(ParquetUtils.WRITER_VERSION); + + this.parquetProps = Collections.unmodifiableList(props); + } + + @Override + protected List getSupportedPropertyDescriptors() { + return parquetProps; + } + + @Override + public Set getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + + final FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + + long startTime = System.currentTimeMillis(); + final AtomicInteger totalRecordCount = new AtomicInteger(0); + + final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key()); + + FlowFile putFlowFile = flowFile; + + putFlowFile = session.write(flowFile, (rawIn, rawOut) -> { + try (final InputStream in = new BufferedInputStream(rawIn); + final DataFileStream dataFileReader = new DataFileStream<>(in, new GenericDatumReader<>())) { + + Schema avroSchema = dataFileReader.getSchema(); + getLogger().debug(avroSchema.toString(true)); + ParquetWriter writer = createParquetWriter(context, flowFile, rawOut, avroSchema ); + + try { + int recordCount = 0; + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(); + writer.write(record); + recordCount++; + } + totalRecordCount.set(recordCount); + } finally { + writer.close(); + } + } + }); + + // Add attributes and transfer to success + StringBuilder newFilename = new StringBuilder(); + int extensionIndex = fileName.lastIndexOf("."); + if (extensionIndex != -1) { + newFilename.append(fileName.substring(0, extensionIndex)); + } else { + newFilename.append(fileName); + } + newFilename.append(".parquet"); + + Map outAttributes = new HashMap<>(); + outAttributes.put(CoreAttributes.FILENAME.key(), newFilename.toString()); + outAttributes.put(RECORD_COUNT_ATTRIBUTE,Integer.toString(totalRecordCount.get()) ); + + putFlowFile = session.putAllAttributes(putFlowFile, outAttributes); + session.transfer(putFlowFile, SUCCESS); + session.getProvenanceReporter().modifyContent(putFlowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime); + + } catch (final ProcessException pe) { + getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe}); + session.transfer(flowFile, FAILURE); + } + + } + + private ParquetWriter createParquetWriter(final ProcessContext context, final FlowFile flowFile, final OutputStream out, final Schema schema) + throws IOException { + + NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out); + + final AvroParquetWriter.Builder parquetWriter = AvroParquetWriter + .builder(nifiParquetOutputFile) + .withSchema(schema); + + Configuration conf = new Configuration(); + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true); + conf.setBoolean("parquet.avro.add-list-element-records", false); + conf.setBoolean("parquet.avro.write-old-list-structure", false); + + ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this); + + return parquetWriter.build(); + } + +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java index 9aa0d82ce0..f4a6875327 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java @@ -36,7 +36,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; - import java.io.IOException; @SupportsBatching diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java index 8b8b814c01..c2794ac039 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java @@ -32,24 +32,18 @@ import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter; +import org.apache.nifi.processors.parquet.utils.ParquetUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.column.ParquetProperties; -import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -77,64 +71,6 @@ import java.util.List; }) public class PutParquet extends AbstractPutHDFSRecord { - public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder() - .name("row-group-size") - .displayName("Row Group Size") - .description("The row group size used by the Parquet writer. " + - "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() - .name("page-size") - .displayName("Page Size") - .description("The page size used by the Parquet writer. " + - "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder() - .name("dictionary-page-size") - .displayName("Dictionary Page Size") - .description("The dictionary page size used by the Parquet writer. " + - "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder() - .name("max-padding-size") - .displayName("Max Padding Size") - .description("The maximum amount of padding that will be used to align row groups with blocks in the " + - "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " + - "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .build(); - - public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder() - .name("enable-dictionary-encoding") - .displayName("Enable Dictionary Encoding") - .description("Specifies whether dictionary encoding should be enabled for the Parquet writer") - .allowableValues("true", "false") - .build(); - - public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder() - .name("enable-validation") - .displayName("Enable Validation") - .description("Specifies whether validation should be enabled for the Parquet writer") - .allowableValues("true", "false") - .build(); - - public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder() - .name("writer-version") - .displayName("Writer Version") - .description("Specifies the version used by Parquet writer") - .allowableValues(ParquetProperties.WriterVersion.values()) - .build(); - public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder() .name("remove-crc-files") .displayName("Remove CRC Files") @@ -166,13 +102,13 @@ public class PutParquet extends AbstractPutHDFSRecord { @Override public List getAdditionalProperties() { final List props = new ArrayList<>(); - props.add(ROW_GROUP_SIZE); - props.add(PAGE_SIZE); - props.add(DICTIONARY_PAGE_SIZE); - props.add(MAX_PADDING_SIZE); - props.add(ENABLE_DICTIONARY_ENCODING); - props.add(ENABLE_VALIDATION); - props.add(WRITER_VERSION); + props.add(ParquetUtils.ROW_GROUP_SIZE); + props.add(ParquetUtils.PAGE_SIZE); + props.add(ParquetUtils.DICTIONARY_PAGE_SIZE); + props.add(ParquetUtils.MAX_PADDING_SIZE); + props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING); + props.add(ParquetUtils.ENABLE_VALIDATION); + props.add(ParquetUtils.WRITER_VERSION); props.add(REMOVE_CRC_FILES); return Collections.unmodifiableList(props); } @@ -187,88 +123,11 @@ public class PutParquet extends AbstractPutHDFSRecord { .builder(path) .withSchema(avroSchema); - applyCommonConfig(parquetWriter, context, flowFile, conf); + ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this); return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema); } - private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) { - builder.withConf(conf); - - // Required properties - - final boolean overwrite = context.getProperty(OVERWRITE).asBoolean(); - final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; - builder.withWriteMode(mode); - - final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName()); - final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue(); - - final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue); - builder.withCompressionCodec(codecName); - - // Optional properties - - if (context.getProperty(ROW_GROUP_SIZE).isSet()){ - try { - final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); - if (rowGroupSize != null) { - builder.withRowGroupSize(rowGroupSize.intValue()); - } - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e); - } - } - - if (context.getProperty(PAGE_SIZE).isSet()) { - try { - final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); - if (pageSize != null) { - builder.withPageSize(pageSize.intValue()); - } - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e); - } - } - - if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) { - try { - final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); - if (dictionaryPageSize != null) { - builder.withDictionaryPageSize(dictionaryPageSize.intValue()); - } - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e); - } - } - - if (context.getProperty(MAX_PADDING_SIZE).isSet()) { - try { - final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); - if (maxPaddingSize != null) { - builder.withMaxPaddingSize(maxPaddingSize.intValue()); - } - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e); - } - } - - if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) { - final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean(); - builder.withDictionaryEncoding(enableDictionaryEncoding); - } - - if (context.getProperty(ENABLE_VALIDATION).isSet()) { - final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean(); - builder.withValidation(enableValidation); - } - - if (context.getProperty(WRITER_VERSION).isSet()) { - final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue(); - builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue)); - } - } - @Override protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) { final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean(); diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java new file mode 100644 index 0000000000..acb2dc46a2 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.parquet.stream; + +import org.apache.parquet.io.PositionOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +public class NifiOutputStream extends PositionOutputStream { + private long position = 0; + private OutputStream outputStream; + + public NifiOutputStream(OutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public long getPos() throws IOException { + return position; + } + + @Override + public void write(int b) throws IOException { + position++; + outputStream.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + outputStream.write(b, off, len); + position += len; + } + + @Override + public void flush() throws IOException { + outputStream.flush(); + } + + @Override + public void close() throws IOException { + outputStream.close(); + } +} + diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java new file mode 100644 index 0000000000..d549b7bf11 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.parquet.stream; + +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; + +import java.io.OutputStream; + +public class NifiParquetOutputFile implements OutputFile { + + private OutputStream outputStream; + + public NifiParquetOutputFile(OutputStream outputStream) { + this.outputStream = outputStream; + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return new NifiOutputStream(outputStream); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return new NifiOutputStream(outputStream); + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java new file mode 100644 index 0000000000..7b116c233e --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.parquet.utils; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.parquet.PutParquet; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ParquetUtils { + + public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder() + .name("row-group-size") + .displayName("Row Group Size") + .description("The row group size used by the Parquet writer. " + + "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder() + .name("page-size") + .displayName("Page Size") + .description("The page size used by the Parquet writer. " + + "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder() + .name("dictionary-page-size") + .displayName("Dictionary Page Size") + .description("The dictionary page size used by the Parquet writer. " + + "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder() + .name("max-padding-size") + .displayName("Max Padding Size") + .description("The maximum amount of padding that will be used to align row groups with blocks in the " + + "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " + + "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + + public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder() + .name("enable-dictionary-encoding") + .displayName("Enable Dictionary Encoding") + .description("Specifies whether dictionary encoding should be enabled for the Parquet writer") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder() + .name("enable-validation") + .displayName("Enable Validation") + .description("Specifies whether validation should be enabled for the Parquet writer") + .allowableValues("true", "false") + .build(); + + public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder() + .name("writer-version") + .displayName("Writer Version") + .description("Specifies the version used by Parquet writer") + .allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values()) + .build(); + + public static List COMPRESSION_TYPES = getCompressionTypes(); + + public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name("compression-type") + .displayName("Compression Type") + .description("The type of compression for the file being written.") + .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0])) + .defaultValue(COMPRESSION_TYPES.get(0).getValue()) + .required(true) + .build(); + + public static List getCompressionTypes() { + final List compressionTypes = new ArrayList<>(); + for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) { + final String name = compressionCodecName.name(); + compressionTypes.add(new AllowableValue(name, name)); + } + return Collections.unmodifiableList(compressionTypes); + } + + public static void applyCommonConfig(final ParquetWriter.Builder builder, + final ProcessContext context, + final FlowFile flowFile, + final Configuration conf, + final AbstractProcessor abstractProcessor) { + builder.withConf(conf); + + // Required properties + boolean overwrite = true; + if(context.getProperty(PutParquet.OVERWRITE).isSet()) + overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean(); + + final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; + builder.withWriteMode(mode); + + final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName()); + + final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue(); + + final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue); + builder.withCompressionCodec(codecName); + + // Optional properties + + if (context.getProperty(ROW_GROUP_SIZE).isSet()){ + try { + final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (rowGroupSize != null) { + builder.withRowGroupSize(rowGroupSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(PAGE_SIZE).isSet()) { + try { + final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (pageSize != null) { + builder.withPageSize(pageSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) { + try { + final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (dictionaryPageSize != null) { + builder.withDictionaryPageSize(dictionaryPageSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(MAX_PADDING_SIZE).isSet()) { + try { + final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + if (maxPaddingSize != null) { + builder.withMaxPaddingSize(maxPaddingSize.intValue()); + } + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e); + } + } + + if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) { + final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean(); + builder.withDictionaryEncoding(enableDictionaryEncoding); + } + + if (context.getProperty(ENABLE_VALIDATION).isSet()) { + final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean(); + builder.withValidation(enableValidation); + } + + if (context.getProperty(WRITER_VERSION).isSet()) { + final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue(); + builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue)); + } + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 36583e3153..6826d6e74b 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.processors.parquet.PutParquet -org.apache.nifi.processors.parquet.FetchParquet \ No newline at end of file +org.apache.nifi.processors.parquet.FetchParquet +org.apache.nifi.processors.parquet.ConvertAvroToParquet \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java index 9e7943e8aa..555dc605a4 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java @@ -42,6 +42,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.hadoop.exception.FailureException; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; +import org.apache.nifi.processors.parquet.utils.ParquetUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; @@ -184,7 +185,7 @@ public class PutParquetTest { @Test public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException { configure(proc, 100); - testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name()); + testRunner.setProperty(ParquetUtils.COMPRESSION_TYPE, CompressionCodecName.GZIP.name()); final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis(); @@ -472,7 +473,7 @@ public class PutParquetTest { @Test public void testRowGroupSize() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B"); + testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "1024 B"); final String filename = "testRowGroupSize-" + System.currentTimeMillis(); @@ -487,7 +488,7 @@ public class PutParquetTest { @Test public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}"); + testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "${row.group.size}"); final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); @@ -503,7 +504,7 @@ public class PutParquetTest { @Test public void testPageSize() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B"); + testRunner.setProperty(ParquetUtils.PAGE_SIZE, "1024 B"); final String filename = "testPageGroupSize-" + System.currentTimeMillis(); @@ -518,7 +519,7 @@ public class PutParquetTest { @Test public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}"); + testRunner.setProperty(ParquetUtils.PAGE_SIZE, "${page.size}"); final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); @@ -534,7 +535,7 @@ public class PutParquetTest { @Test public void testDictionaryPageSize() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B"); + testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "1024 B"); final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis(); @@ -549,7 +550,7 @@ public class PutParquetTest { @Test public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}"); + testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}"); final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); @@ -565,7 +566,7 @@ public class PutParquetTest { @Test public void testMaxPaddingPageSize() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B"); + testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "1024 B"); final String filename = "testMaxPaddingSize-" + System.currentTimeMillis(); @@ -580,7 +581,7 @@ public class PutParquetTest { @Test public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException { configure(proc, 10); - testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}"); + testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "${max.padding.size}"); final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis(); diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java new file mode 100644 index 0000000000..261de9bd2d --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.parquet; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.DatumReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for ConvertAvroToParquet processor + */ +public class TestConvertAvroToParquet { + + private ConvertAvroToParquet processor; + private TestRunner runner; + + private List records = new ArrayList<>(); + File tmpAvro = new File("target/test.avro"); + File tmpParquet = new File("target/test.parquet"); + + @Before + public void setUp() throws Exception { + processor = new ConvertAvroToParquet(); + runner = TestRunners.newTestRunner(processor); + + Schema schema = new Schema.Parser().parse(Resources.getResource("avro/all-minus-enum.avsc").openStream()); + + DataFileWriter awriter = new DataFileWriter(new GenericDatumWriter()); + GenericData.Record nestedRecord = new GenericRecordBuilder( + schema.getField("mynestedrecord").schema()) + .set("mynestedint", 1).build(); + + GenericData.Record record = new GenericRecordBuilder(schema) + .set("mynull", null) + .set("myboolean", true) + .set("myint", 1) + .set("mylong", 2L) + .set("myfloat", 3.1f) + .set("mydouble", 4.1) + .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8))) + .set("mystring", "hello") + .set("mynestedrecord", nestedRecord) + .set("myarray", new GenericData.Array(Schema.createArray(Schema.create(Schema.Type.INT)), Arrays.asList(1, 2))) + .set("mymap", ImmutableMap.of("a", 1, "b", 2)) + .set("myfixed", new GenericData.Fixed(Schema.createFixed("ignored", null, null, 1), new byte[] { (byte) 65 })) + .build(); + + awriter.create(schema, tmpAvro); + awriter.append(record); + awriter.flush(); + awriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader dataFileReader = new DataFileReader(tmpAvro, datumReader); + GenericRecord record1 = null; + while (dataFileReader.hasNext()) { + record1 = dataFileReader.next(record1); + records.add(record1); + } + + } + + @Test + public void test_Processor() throws Exception { + + FileInputStream fileInputStream = new FileInputStream(tmpAvro); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int readedBytes; + byte[] buf = new byte[1024]; + while ((readedBytes = fileInputStream.read(buf)) > 0) { + out.write(buf, 0, readedBytes); + } + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test.avro"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConvertAvroToParquet.SUCCESS, 1); + + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0); + + // assert meta data + assertEquals("1", resultFlowFile.getAttribute(ConvertAvroToParquet.RECORD_COUNT_ATTRIBUTE)); + assertEquals("test.parquet", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key())); + + + } + + @Test + public void test_Meta_Info() throws Exception { + + FileInputStream fileInputStream = new FileInputStream(tmpAvro); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int readedBytes; + byte[] buf = new byte[1024]; + while ((readedBytes = fileInputStream.read(buf)) > 0) { + out.write(buf, 0, readedBytes); + } + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test.avro"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0); + + // Save the flowfile + byte[] resultContents = runner.getContentAsByteArray(resultFlowFile); + FileOutputStream fos = new FileOutputStream(tmpParquet); + fos.write(resultContents); + fos.flush(); + fos.close(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + ParquetMetadata metaData; + metaData = ParquetFileReader.readFooter(conf, new Path(tmpParquet.getAbsolutePath()), NO_FILTER); + + // #number of records + long nParquetRecords = 0; + for(BlockMetaData meta : metaData.getBlocks()){ + nParquetRecords += meta.getRowCount(); + } + long nAvroRecord = records.size(); + + assertEquals(nParquetRecords, nAvroRecord); + } + + @Test + public void test_Data() throws Exception { + + + FileInputStream fileInputStream = new FileInputStream(tmpAvro); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + int readedBytes; + byte[] buf = new byte[1024]; + while ((readedBytes = fileInputStream.read(buf)) > 0) { + out.write(buf, 0, readedBytes); + } + out.close(); + + Map attributes = new HashMap() {{ + put(CoreAttributes.FILENAME.key(), "test.avro"); + }}; + runner.enqueue(out.toByteArray(), attributes); + runner.run(); + MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0); + + // Save the flowfile + byte[] resultContents = runner.getContentAsByteArray(resultFlowFile); + FileOutputStream fos = new FileOutputStream(tmpParquet); + fos.write(resultContents); + fos.flush(); + fos.close(); + + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.getLocal(conf); + ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), new Path(tmpParquet.getAbsolutePath())) + .withConf(conf) + .build(); + + List parquetRecords = new ArrayList(); + + Group current; + current = reader.read(); + while (current != null) { + assertTrue(current instanceof Group); + parquetRecords.add(current); + current = reader.read(); + } + + Group firstRecord = parquetRecords.get(0); + + // Primitive + assertEquals(firstRecord.getInteger("myint", 0), 1); + assertEquals(firstRecord.getLong("mylong", 0), 2); + assertEquals(firstRecord.getBoolean("myboolean", 0), true); + assertEquals(firstRecord.getFloat("myfloat", 0), 3.1, 0.0001); + assertEquals(firstRecord.getDouble("mydouble", 0), 4.1, 0.001); + assertEquals(firstRecord.getString("mybytes", 0), "hello"); + assertEquals(firstRecord.getString("mystring", 0), "hello"); + + // Nested + assertEquals(firstRecord.getGroup("mynestedrecord",0).getInteger("mynestedint",0), 1); + + // Array + assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",0).getInteger("element", 0), 1); + assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element", 0), 2); + + // Map + assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",0).getInteger("value", 0), 1); + assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",1).getInteger("value", 0), 2); + + // Fixed + assertEquals(firstRecord.getString("myfixed",0), "A"); + + } + + @After + public void cleanup(){ + tmpAvro.delete(); + tmpParquet.delete(); + + } + + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc new file mode 100644 index 0000000000..3a383a0a88 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc @@ -0,0 +1,60 @@ +{ + "name": "myrecord", + "namespace": "parquet.avro", + "type": "record", + "fields": [{ + "name": "mynull", + "type": "null" + }, { + "name": "myboolean", + "type": "boolean" + }, { + "name": "myint", + "type": "int" + }, { + "name": "mylong", + "type": "long" + }, { + "name": "myfloat", + "type": "float" + }, { + "name": "mydouble", + "type": "double" + }, { + "name": "mybytes", + "type": "bytes" + }, { + "name": "mystring", + "type": "string" + }, { + "name": "mynestedrecord", + "type": { + "type": "record", + "name": "ignored1", + "fields": [{ + "name": "mynestedint", + "type": "int" + }] + } + }, { + "name": "myarray", + "type": { + "type": "array", + "items": "int" + } + }, { + "name": "mymap", + "type": { + "type": "map", + "values": "int" + } + }, { + "name": "myfixed", + "type": { + "type": "fixed", + "name": "ignored3", + "namespace": "", + "size": 1 + } + }] +} \ No newline at end of file