diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index c8aa0edb06..d458d27ce0 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -92,6 +92,7 @@
src/test/resources/avro/user.avscsrc/test/resources/avro/user-with-array.avscsrc/test/resources/avro/user-with-nullable-array.avsc
+ src/test/resources/avro/all-minus-enum.avsc
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
new file mode 100644
index 0000000000..0bcf606f22
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetWriter;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does "
+ + "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can "
+ + "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"),
+ @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+ // Attributes
+ public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+ private volatile List parquetProps;
+
+ // Relationships
+ static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Parquet file that was converted successfully from Avro")
+ .build();
+
+ static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Avro content that could not be processed")
+ .build();
+
+ static final Set RELATIONSHIPS
+ = ImmutableSet.builder()
+ .add(SUCCESS)
+ .add(FAILURE)
+ .build();
+
+ @Override
+ protected final void init(final ProcessorInitializationContext context) {
+
+
+ final List props = new ArrayList<>();
+
+ props.add(ParquetUtils.COMPRESSION_TYPE);
+ props.add(ParquetUtils.ROW_GROUP_SIZE);
+ props.add(ParquetUtils.PAGE_SIZE);
+ props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
+ props.add(ParquetUtils.MAX_PADDING_SIZE);
+ props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
+ props.add(ParquetUtils.ENABLE_VALIDATION);
+ props.add(ParquetUtils.WRITER_VERSION);
+
+ this.parquetProps = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return parquetProps;
+ }
+
+ @Override
+ public Set getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+
+ long startTime = System.currentTimeMillis();
+ final AtomicInteger totalRecordCount = new AtomicInteger(0);
+
+ final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ FlowFile putFlowFile = flowFile;
+
+ putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
+ try (final InputStream in = new BufferedInputStream(rawIn);
+ final DataFileStream dataFileReader = new DataFileStream<>(in, new GenericDatumReader<>())) {
+
+ Schema avroSchema = dataFileReader.getSchema();
+ getLogger().debug(avroSchema.toString(true));
+ ParquetWriter writer = createParquetWriter(context, flowFile, rawOut, avroSchema );
+
+ try {
+ int recordCount = 0;
+ GenericRecord record = null;
+ while (dataFileReader.hasNext()) {
+ record = dataFileReader.next();
+ writer.write(record);
+ recordCount++;
+ }
+ totalRecordCount.set(recordCount);
+ } finally {
+ writer.close();
+ }
+ }
+ });
+
+ // Add attributes and transfer to success
+ StringBuilder newFilename = new StringBuilder();
+ int extensionIndex = fileName.lastIndexOf(".");
+ if (extensionIndex != -1) {
+ newFilename.append(fileName.substring(0, extensionIndex));
+ } else {
+ newFilename.append(fileName);
+ }
+ newFilename.append(".parquet");
+
+ Map outAttributes = new HashMap<>();
+ outAttributes.put(CoreAttributes.FILENAME.key(), newFilename.toString());
+ outAttributes.put(RECORD_COUNT_ATTRIBUTE,Integer.toString(totalRecordCount.get()) );
+
+ putFlowFile = session.putAllAttributes(putFlowFile, outAttributes);
+ session.transfer(putFlowFile, SUCCESS);
+ session.getProvenanceReporter().modifyContent(putFlowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
+
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe});
+ session.transfer(flowFile, FAILURE);
+ }
+
+ }
+
+ private ParquetWriter createParquetWriter(final ProcessContext context, final FlowFile flowFile, final OutputStream out, final Schema schema)
+ throws IOException {
+
+ NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out);
+
+ final AvroParquetWriter.Builder parquetWriter = AvroParquetWriter
+ .builder(nifiParquetOutputFile)
+ .withSchema(schema);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+ conf.setBoolean("parquet.avro.add-list-element-records", false);
+ conf.setBoolean("parquet.avro.write-old-list-structure", false);
+
+ ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
+
+ return parquetWriter.build();
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index 9aa0d82ce0..f4a6875327 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -36,7 +36,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
-
import java.io.IOException;
@SupportsBatching
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index 8b8b814c01..c2794ac039 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -32,24 +32,18 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -77,64 +71,6 @@ import java.util.List;
})
public class PutParquet extends AbstractPutHDFSRecord {
- public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
- .name("row-group-size")
- .displayName("Row Group Size")
- .description("The row group size used by the Parquet writer. " +
- "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
- .name("page-size")
- .displayName("Page Size")
- .description("The page size used by the Parquet writer. " +
- "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
- .name("dictionary-page-size")
- .displayName("Dictionary Page Size")
- .description("The dictionary page size used by the Parquet writer. " +
- "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
- .name("max-padding-size")
- .displayName("Max Padding Size")
- .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
- "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
- "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
- .name("enable-dictionary-encoding")
- .displayName("Enable Dictionary Encoding")
- .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
- .allowableValues("true", "false")
- .build();
-
- public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
- .name("enable-validation")
- .displayName("Enable Validation")
- .description("Specifies whether validation should be enabled for the Parquet writer")
- .allowableValues("true", "false")
- .build();
-
- public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
- .name("writer-version")
- .displayName("Writer Version")
- .description("Specifies the version used by Parquet writer")
- .allowableValues(ParquetProperties.WriterVersion.values())
- .build();
-
public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder()
.name("remove-crc-files")
.displayName("Remove CRC Files")
@@ -166,13 +102,13 @@ public class PutParquet extends AbstractPutHDFSRecord {
@Override
public List getAdditionalProperties() {
final List props = new ArrayList<>();
- props.add(ROW_GROUP_SIZE);
- props.add(PAGE_SIZE);
- props.add(DICTIONARY_PAGE_SIZE);
- props.add(MAX_PADDING_SIZE);
- props.add(ENABLE_DICTIONARY_ENCODING);
- props.add(ENABLE_VALIDATION);
- props.add(WRITER_VERSION);
+ props.add(ParquetUtils.ROW_GROUP_SIZE);
+ props.add(ParquetUtils.PAGE_SIZE);
+ props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
+ props.add(ParquetUtils.MAX_PADDING_SIZE);
+ props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
+ props.add(ParquetUtils.ENABLE_VALIDATION);
+ props.add(ParquetUtils.WRITER_VERSION);
props.add(REMOVE_CRC_FILES);
return Collections.unmodifiableList(props);
}
@@ -187,88 +123,11 @@ public class PutParquet extends AbstractPutHDFSRecord {
.builder(path)
.withSchema(avroSchema);
- applyCommonConfig(parquetWriter, context, flowFile, conf);
+ ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
}
- private void applyCommonConfig(final ParquetWriter.Builder, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
- builder.withConf(conf);
-
- // Required properties
-
- final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
- final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
- builder.withWriteMode(mode);
-
- final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName());
- final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
-
- final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
- builder.withCompressionCodec(codecName);
-
- // Optional properties
-
- if (context.getProperty(ROW_GROUP_SIZE).isSet()){
- try {
- final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (rowGroupSize != null) {
- builder.withRowGroupSize(rowGroupSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(PAGE_SIZE).isSet()) {
- try {
- final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (pageSize != null) {
- builder.withPageSize(pageSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
- try {
- final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (dictionaryPageSize != null) {
- builder.withDictionaryPageSize(dictionaryPageSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
- try {
- final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (maxPaddingSize != null) {
- builder.withMaxPaddingSize(maxPaddingSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
- final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
- builder.withDictionaryEncoding(enableDictionaryEncoding);
- }
-
- if (context.getProperty(ENABLE_VALIDATION).isSet()) {
- final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
- builder.withValidation(enableValidation);
- }
-
- if (context.getProperty(WRITER_VERSION).isSet()) {
- final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
- builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
- }
- }
-
@Override
protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean();
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
new file mode 100644
index 0000000000..acb2dc46a2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.stream;
+
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class NifiOutputStream extends PositionOutputStream {
+ private long position = 0;
+ private OutputStream outputStream;
+
+ public NifiOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ position++;
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputStream.write(b, off, len);
+ position += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+}
+
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
new file mode 100644
index 0000000000..d549b7bf11
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.stream;
+
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.OutputStream;
+
+public class NifiParquetOutputFile implements OutputFile {
+
+ private OutputStream outputStream;
+
+ public NifiParquetOutputFile(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) {
+ return new NifiOutputStream(outputStream);
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+ return new NifiOutputStream(outputStream);
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return false;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 0;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
new file mode 100644
index 0000000000..7b116c233e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.PutParquet;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ParquetUtils {
+
+ public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
+ .name("row-group-size")
+ .displayName("Row Group Size")
+ .description("The row group size used by the Parquet writer. " +
+ "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
+ .name("page-size")
+ .displayName("Page Size")
+ .description("The page size used by the Parquet writer. " +
+ "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
+ .name("dictionary-page-size")
+ .displayName("Dictionary Page Size")
+ .description("The dictionary page size used by the Parquet writer. " +
+ "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
+ .name("max-padding-size")
+ .displayName("Max Padding Size")
+ .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
+ "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
+ "The value is specified in the format of where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
+ .name("enable-dictionary-encoding")
+ .displayName("Enable Dictionary Encoding")
+ .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
+ .name("enable-validation")
+ .displayName("Enable Validation")
+ .description("Specifies whether validation should be enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
+ .name("writer-version")
+ .displayName("Writer Version")
+ .description("Specifies the version used by Parquet writer")
+ .allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values())
+ .build();
+
+ public static List COMPRESSION_TYPES = getCompressionTypes();
+
+ public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
+ .name("compression-type")
+ .displayName("Compression Type")
+ .description("The type of compression for the file being written.")
+ .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0]))
+ .defaultValue(COMPRESSION_TYPES.get(0).getValue())
+ .required(true)
+ .build();
+
+ public static List getCompressionTypes() {
+ final List compressionTypes = new ArrayList<>();
+ for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
+ final String name = compressionCodecName.name();
+ compressionTypes.add(new AllowableValue(name, name));
+ }
+ return Collections.unmodifiableList(compressionTypes);
+ }
+
+ public static void applyCommonConfig(final ParquetWriter.Builder, ?> builder,
+ final ProcessContext context,
+ final FlowFile flowFile,
+ final Configuration conf,
+ final AbstractProcessor abstractProcessor) {
+ builder.withConf(conf);
+
+ // Required properties
+ boolean overwrite = true;
+ if(context.getProperty(PutParquet.OVERWRITE).isSet())
+ overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean();
+
+ final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+ builder.withWriteMode(mode);
+
+ final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName());
+
+ final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
+
+ final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
+ builder.withCompressionCodec(codecName);
+
+ // Optional properties
+
+ if (context.getProperty(ROW_GROUP_SIZE).isSet()){
+ try {
+ final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (rowGroupSize != null) {
+ builder.withRowGroupSize(rowGroupSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(PAGE_SIZE).isSet()) {
+ try {
+ final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (pageSize != null) {
+ builder.withPageSize(pageSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
+ try {
+ final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (dictionaryPageSize != null) {
+ builder.withDictionaryPageSize(dictionaryPageSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
+ try {
+ final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (maxPaddingSize != null) {
+ builder.withMaxPaddingSize(maxPaddingSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
+ final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
+ builder.withDictionaryEncoding(enableDictionaryEncoding);
+ }
+
+ if (context.getProperty(ENABLE_VALIDATION).isSet()) {
+ final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
+ builder.withValidation(enableValidation);
+ }
+
+ if (context.getProperty(WRITER_VERSION).isSet()) {
+ final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
+ builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 36583e3153..6826d6e74b 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.parquet.PutParquet
-org.apache.nifi.processors.parquet.FetchParquet
\ No newline at end of file
+org.apache.nifi.processors.parquet.FetchParquet
+org.apache.nifi.processors.parquet.ConvertAvroToParquet
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index 9e7943e8aa..555dc605a4 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -42,6 +42,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -184,7 +185,7 @@ public class PutParquetTest {
@Test
public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException {
configure(proc, 100);
- testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
+ testRunner.setProperty(ParquetUtils.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis();
@@ -472,7 +473,7 @@ public class PutParquetTest {
@Test
public void testRowGroupSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "1024 B");
final String filename = "testRowGroupSize-" + System.currentTimeMillis();
@@ -487,7 +488,7 @@ public class PutParquetTest {
@Test
public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}");
+ testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "${row.group.size}");
final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@@ -503,7 +504,7 @@ public class PutParquetTest {
@Test
public void testPageSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.PAGE_SIZE, "1024 B");
final String filename = "testPageGroupSize-" + System.currentTimeMillis();
@@ -518,7 +519,7 @@ public class PutParquetTest {
@Test
public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}");
+ testRunner.setProperty(ParquetUtils.PAGE_SIZE, "${page.size}");
final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@@ -534,7 +535,7 @@ public class PutParquetTest {
@Test
public void testDictionaryPageSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "1024 B");
final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis();
@@ -549,7 +550,7 @@ public class PutParquetTest {
@Test
public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
+ testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@@ -565,7 +566,7 @@ public class PutParquetTest {
@Test
public void testMaxPaddingPageSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "1024 B");
final String filename = "testMaxPaddingSize-" + System.currentTimeMillis();
@@ -580,7 +581,7 @@ public class PutParquetTest {
@Test
public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}");
+ testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "${max.padding.size}");
final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
new file mode 100644
index 0000000000..261de9bd2d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Resources;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for ConvertAvroToParquet processor
+ */
+public class TestConvertAvroToParquet {
+
+ private ConvertAvroToParquet processor;
+ private TestRunner runner;
+
+ private List records = new ArrayList<>();
+ File tmpAvro = new File("target/test.avro");
+ File tmpParquet = new File("target/test.parquet");
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new ConvertAvroToParquet();
+ runner = TestRunners.newTestRunner(processor);
+
+ Schema schema = new Schema.Parser().parse(Resources.getResource("avro/all-minus-enum.avsc").openStream());
+
+ DataFileWriter