From 76c2c3fee298ae33775ce794fef7f3a0851eb179 Mon Sep 17 00:00:00 2001 From: Bryan Bende Date: Fri, 23 Aug 2019 16:15:34 -0400 Subject: [PATCH] NIFI-6089 Add Parquet record reader and writer NIFI-5755 Allow PutParquet processor to specify avro write configuration Review feedback Additional review feedback This closes #3679 Signed-off-by: Mike Thomsen --- .../azure/eventhub/ConsumeAzureEventHub.java | 8 +- .../eventhub/TestConsumeAzureEventHub.java | 3 +- .../CouchbaseRecordLookupService.java | 2 +- .../nifi-avro-record-utils/pom.xml | 4 + .../SchemaRegistryRecordSetWriter.java | 0 .../record/ArrayListRecordReader.java | 2 +- .../record/CommaSeparatedRecordReader.java | 18 +- .../record/MockRecordParser.java | 18 +- .../listen/SSLSocketChannelRecordReader.java | 6 +- .../listen/SocketChannelRecordReader.java | 9 +- .../StandardSocketChannelRecordReader.java | 6 +- .../CacheIdSchemaAccessWriter.java | 0 .../hive/TestPutHive3Streaming.java | 4 +- .../kafka/pubsub/ConsumerLease.java | 47 +++-- .../kafka/pubsub/PublishKafkaRecord_0_10.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 34 ++-- .../kafka/pubsub/util/MockRecordParser.java | 20 +- .../kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 32 +-- .../kafka/pubsub/util/MockRecordParser.java | 20 +- .../kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 32 +-- .../kafka/pubsub/util/MockRecordParser.java | 20 +- .../kafka/pubsub/ConsumerLease.java | 2 +- .../kafka/pubsub/TestPublisherLease.java | 4 +- .../kafka/pubsub/util/MockRecordParser.java | 2 +- .../apache/nifi/parquet/ParquetReader.java | 59 ++++++ .../nifi/parquet/ParquetRecordSetWriter.java | 116 +++++++++++ .../hadoop}/AvroParquetHDFSRecordReader.java | 2 +- .../hadoop}/AvroParquetHDFSRecordWriter.java | 2 +- .../parquet/record/ParquetRecordReader.java | 98 ++++++++++ .../parquet/record/WriteParquetResult.java | 81 ++++++++ .../parquet/stream/NifiOutputStream.java | 2 +- .../parquet/stream/NifiParquetInputFile.java | 53 +++++ .../parquet/stream/NifiParquetOutputFile.java | 2 +- .../stream/NifiSeekableInputStream.java | 71 +++++++ .../nifi/parquet/utils/ParquetConfig.java | 133 +++++++++++++ .../parquet/utils/ParquetUtils.java | 184 ++++++++++++++---- .../parquet/ConvertAvroToParquet.java | 33 ++-- .../nifi/processors/parquet/FetchParquet.java | 2 +- .../nifi/processors/parquet/PutParquet.java | 25 ++- ...g.apache.nifi.controller.ControllerService | 17 ++ .../nifi/parquet/TestParquetReader.java | 119 +++++++++++ .../parquet/TestParquetRecordSetWriter.java | 159 +++++++++++++++ .../processors/parquet/PutParquetTest.java | 2 +- .../nifi/record/script/ScriptedReader.java | 4 +- .../record/script/ScriptedReaderTest.groovy | 13 +- .../groovy/test_record_reader_inline.groovy | 2 +- .../groovy/test_record_reader_xml.groovy | 2 +- .../standard/AbstractRecordProcessor.java | 2 +- .../standard/AbstractRouteRecord.java | 2 +- .../nifi/processors/standard/ForkRecord.java | 30 +-- .../processors/standard/ListenTCPRecord.java | 4 +- .../processors/standard/ListenUDPRecord.java | 3 +- .../processors/standard/PartitionRecord.java | 34 ++-- .../nifi/processors/standard/QueryRecord.java | 2 +- .../nifi/processors/standard/SplitRecord.java | 24 +-- .../processors/standard/TestForkRecord.java | 21 +- .../standard/TestValidateRecord.java | 21 +- .../apache/nifi/lookup/RestLookupService.java | 6 +- .../serialization/RecordReaderFactory.java | 7 +- .../java/org/apache/nifi/avro/AvroReader.java | 2 +- .../java/org/apache/nifi/csv/CSVReader.java | 2 +- .../java/org/apache/nifi/grok/GrokReader.java | 2 +- .../org/apache/nifi/json/JsonPathReader.java | 3 +- .../org/apache/nifi/json/JsonTreeReader.java | 3 +- .../apache/nifi/syslog/Syslog5424Reader.java | 2 +- .../org/apache/nifi/syslog/SyslogReader.java | 2 +- .../java/org/apache/nifi/xml/XMLReader.java | 3 +- 69 files changed, 1340 insertions(+), 315 deletions(-) rename nifi-nar-bundles/{nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services => nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils}/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java (100%) rename nifi-nar-bundles/{nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services => nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils}/src/main/java/org/apache/nifi/serialization/CacheIdSchemaAccessWriter.java (100%) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java rename nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/{processors/parquet/record => parquet/hadoop}/AvroParquetHDFSRecordReader.java (97%) rename nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/{processors/parquet/record => parquet/hadoop}/AvroParquetHDFSRecordWriter.java (97%) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/WriteParquetResult.java rename nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/{processors => }/parquet/stream/NifiOutputStream.java (97%) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java rename nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/{processors => }/parquet/stream/NifiParquetOutputFile.java (96%) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java rename nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/{processors => }/parquet/utils/ParquetUtils.java (55%) create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java create mode 100644 nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetRecordSetWriter.java diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java index fb7699f8c1..e9bafb0cfa 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/ConsumeAzureEventHub.java @@ -48,6 +48,7 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; @@ -74,7 +75,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import static org.apache.nifi.util.StringUtils.isEmpty; @@ -454,9 +454,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor { try (final OutputStream out = session.write(flowFile)) { for (final EventData eventData : messages) { - - try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) { - final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger); + final byte[] eventDataBytes = eventData.getBytes(); + try (final InputStream in = new ByteArrayInputStream(eventDataBytes)) { + final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, eventDataBytes.length, logger); Record record; while ((record = reader.nextRecord()) != null) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java index 3946cba0bf..6f350937bd 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/eventhub/TestConsumeAzureEventHub.java @@ -59,6 +59,7 @@ import java.util.stream.Stream; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -204,7 +205,7 @@ public class TestConsumeAzureEventHub { final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class); processor.setReaderFactory(readerFactory); final RecordReader reader = mock(RecordReader.class); - when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader); + when(readerFactory.createRecordReader(anyMap(), any(), anyLong(), any())).thenReturn(reader); final List recordList = eventDataList.stream() .map(eventData -> toRecord(new String(eventData.getBytes()))) .collect(Collectors.toList()); diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java index e12eac9959..32179c6139 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseRecordLookupService.java @@ -114,7 +114,7 @@ public class CouchbaseRecordLookupService extends AbstractCouchbaseLookupService recordReaderVariables.put(k, value.toString()); } }); - return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, getLogger())); + return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, -1, getLogger())); } catch (Exception e) { return new Tuple<>(e, null); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml index 7884aae30b..1a5c8dc02f 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/pom.xml @@ -49,6 +49,10 @@ org.apache.nifi nifi-schema-registry-service-api + + org.apache.nifi + nifi-record-serialization-service-api + org.apache.nifi nifi-record diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java similarity index 100% rename from nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java index 5586ede477..ea6c02d61e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/ArrayListRecordReader.java @@ -36,7 +36,7 @@ public class ArrayListRecordReader extends AbstractControllerService implements } @Override - public ArrayListReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) { + public ArrayListReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) { return new ArrayListReader(records, schema); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java index 6597b75e4d..560955ccce 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java @@ -17,6 +17,14 @@ package org.apache.nifi.serialization.record; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; + import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -26,14 +34,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.SimpleRecordSchema; - public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory { private int failAfterN; private int recordCount = 0; @@ -51,7 +51,7 @@ public class CommaSeparatedRecordReader extends AbstractControllerService implem } @Override - public RecordReader createRecordReader(Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final List fields = new ArrayList<>(); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java index be4046cb54..9081d008c7 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java @@ -17,14 +17,6 @@ package org.apache.nifi.serialization.record; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -33,6 +25,14 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.SimpleRecordSchema; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { private final List records = new ArrayList<>(); private final List fields = new ArrayList<>(); @@ -67,7 +67,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { final Iterator itr = records.iterator(); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java index 12a3f3e9ed..f393419e33 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SSLSocketChannelRecordReader.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.record.listen; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; @@ -30,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.nio.channels.SocketChannel; +import java.util.Collections; /** * Encapsulates an SSLSocketChannel and a RecordReader created for the given channel. @@ -54,14 +54,14 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader { } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { + public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { if (recordReader != null) { throw new IllegalStateException("Cannot create RecordReader because already created"); } final InputStream socketIn = new SSLSocketChannelInputStream(sslSocketChannel); final InputStream in = new BufferedInputStream(socketIn); - recordReader = readerFactory.createRecordReader(flowFile, in, logger); + recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger); return recordReader; } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReader.java index b648b7753d..72f931d5cb 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/SocketChannelRecordReader.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.record.listen; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; @@ -32,18 +31,14 @@ import java.net.InetAddress; public interface SocketChannelRecordReader extends Closeable { /** - * Currently a RecordReader can only be created with a FlowFile. Since we won't have a FlowFile at the time - * a connection is accepted, this method will be used to lazily create the RecordReader later. Eventually this - * method should be removed and the reader should be passed in through the constructor. + * Lazily creates the RecordReader. * - * - * @param flowFile the flow file we are creating the reader for * @param logger the logger of the component creating the reader * @return a RecordReader * * @throws IllegalStateException if create is called after a reader has already been created */ - RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException; + RecordReader createRecordReader(ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException; /** * @return the RecordReader created by calling createRecordReader, or null if one has not been created yet diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/StandardSocketChannelRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/StandardSocketChannelRecordReader.java index 1e220442c0..a2da2870c8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/StandardSocketChannelRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/record/listen/StandardSocketChannelRecordReader.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.record.listen; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; @@ -27,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.nio.channels.SocketChannel; +import java.util.Collections; /** * Encapsulates a SocketChannel and a RecordReader created for the given channel. @@ -48,13 +48,13 @@ public class StandardSocketChannelRecordReader implements SocketChannelRecordRea } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { + public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { if (recordReader != null) { throw new IllegalStateException("Cannot create RecordReader because already created"); } final InputStream in = socketChannel.socket().getInputStream(); - recordReader = readerFactory.createRecordReader(flowFile, in, logger); + recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger); return recordReader; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/CacheIdSchemaAccessWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/CacheIdSchemaAccessWriter.java similarity index 100% rename from nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/CacheIdSchemaAccessWriter.java rename to nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/CacheIdSchemaAccessWriter.java diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java index c82f55c34c..4f49932d0f 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive3-processors/src/test/java/org/apache/nifi/processors/hive/TestPutHive3Streaming.java @@ -171,11 +171,11 @@ public class TestPutHive3Streaming { runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH); MockRecordParser readerFactory = new MockRecordParser() { @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { if (failOnCreateReader) { throw new SchemaNotFoundException("test"); } - return super.createRecordReader(variables, in, logger); + return super.createRecordReader(variables, in, inputLength, logger); } }; final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 2eaa58fe17..d9772692b9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -16,29 +16,6 @@ */ package org.apache.nifi.processors.kafka.pubsub; -import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE; -import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS; -import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; -import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; - -import java.io.ByteArrayInputStream; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; - -import javax.xml.bind.DatatypeConverter; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -60,6 +37,28 @@ import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import javax.xml.bind.DatatypeConverter; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; + +import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE; +import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; + /** * This class represents a lease to access a Kafka Consumer object. The lease is * intended to be obtained from a ConsumerPool. The lease is closeable to allow @@ -471,7 +470,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final Record firstRecord; try { - reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger); + reader = readerFactory.createRecordReader(Collections.emptyMap(), in, recordBytes.length, logger); firstRecord = reader.nextRecord(); } catch (final Exception e) { handleParseFailure.accept(consumerRecord, e); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index 6d6af61e2f..bbc4bf1e2a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -330,7 +330,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { @Override public void process(final InputStream in) throws IOException { try { - final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger()); + final RecordReader reader = readerFactory.createRecordReader(attributes, in, flowFile.getSize(), getLogger()); final RecordSet recordSet = reader.createRecordSet(); final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 1f1d2b9799..f2dcfd399b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -17,22 +17,6 @@ package org.apache.nifi.processors.kafka.pubsub; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -57,6 +41,22 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class TestPublisherLease { private ComponentLog logger; @@ -270,7 +270,7 @@ public class TestPublisherLease { readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); - final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger); final RecordSet recordSet = reader.createRecordSet(); final RecordSchema schema = reader.getSchema(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java index 21527be89f..f8f426be63 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -17,15 +17,6 @@ package org.apache.nifi.processors.kafka.pubsub.util; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -40,6 +31,15 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { private final List records = new ArrayList<>(); private final List fields = new ArrayList<>(); @@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 36e03ea019..bcba8ac9fb 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -520,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordReader reader; try { - reader = readerFactory.createRecordReader(attributes, in, logger); + reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); } catch (final IOException e) { yield(); rollback(topicPartition); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index cc14a263b1..3f726a4196 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -17,21 +17,6 @@ package org.apache.nifi.processors.kafka.pubsub; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -56,6 +41,21 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class TestPublisherLease { private ComponentLog logger; @@ -266,7 +266,7 @@ public class TestPublisherLease { readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); - final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger); final RecordSet recordSet = reader.createRecordSet(); final RecordSchema schema = reader.getSchema(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java index 21527be89f..f8f426be63 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -17,15 +17,6 @@ package org.apache.nifi.processors.kafka.pubsub.util; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -40,6 +31,15 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { private final List records = new ArrayList<>(); private final List fields = new ArrayList<>(); @@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index f2a382dcab..04176144f1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -520,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordReader reader; try { - reader = readerFactory.createRecordReader(attributes, in, logger); + reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); } catch (final IOException e) { yield(); rollback(topicPartition); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index d148a45efd..3f726a4196 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -17,21 +17,6 @@ package org.apache.nifi.processors.kafka.pubsub; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -56,6 +41,21 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class TestPublisherLease { private ComponentLog logger; @@ -266,7 +266,7 @@ public class TestPublisherLease { readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); - final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger); final RecordSet recordSet = reader.createRecordSet(); final RecordSchema schema = reader.getSchema(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java index 21527be89f..f8f426be63 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -17,15 +17,6 @@ package org.apache.nifi.processors.kafka.pubsub.util; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -40,6 +31,15 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { private final List records = new ArrayList<>(); private final List fields = new ArrayList<>(); @@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 83aa14b95e..77d53a979d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -520,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordReader reader; try { - reader = readerFactory.createRecordReader(attributes, in, logger); + reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); } catch (final IOException e) { yield(); rollback(topicPartition); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java index 6280087d50..141402e9f0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java @@ -22,8 +22,6 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processors.kafka.pubsub.PublishResult; -import org.apache.nifi.processors.kafka.pubsub.PublisherLease; import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; @@ -267,7 +265,7 @@ public class TestPublisherLease { readerService.addSchemaField("name", RecordFieldType.STRING); readerService.addSchemaField("age", RecordFieldType.INT); - final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger); + final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger); final RecordSet recordSet = reader.createRecordSet(); final RecordSchema schema = reader.getSchema(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java index 7ed637cbba..f8f426be63 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java new file mode 100644 index 0000000000..94d2101e67 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetReader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.parquet.record.ParquetRecordReader; +import org.apache.nifi.parquet.utils.ParquetConfig; +import org.apache.nifi.parquet.utils.ParquetUtils; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig; +import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig; + +@Tags({"parquet", "parse", "record", "row", "reader"}) +@CapabilityDescription("Parses Parquet data and returns each Parquet record as a separate Record object. " + + "The schema will come from the Parquet data itself.") +public class ParquetReader extends AbstractControllerService implements RecordReaderFactory { + + @Override + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException { + final Configuration conf = new Configuration(); + final ParquetConfig parquetConfig = createParquetConfig(getConfigurationContext(), variables); + applyCommonConfig(conf, parquetConfig); + return new ParquetRecordReader(in, inputLength, conf); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(ParquetUtils.AVRO_READ_COMPATIBILITY); + return properties; + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java new file mode 100644 index 0000000000..85af65fb58 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/ParquetRecordSetWriter.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.apache.avro.Schema; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.parquet.record.WriteParquetResult; +import org.apache.nifi.parquet.utils.ParquetConfig; +import org.apache.nifi.parquet.utils.ParquetUtils; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig; + +@Tags({"parquet", "result", "set", "writer", "serializer", "record", "recordset", "row"}) +@CapabilityDescription("Writes the contents of a RecordSet in Parquet format.") +public class ParquetRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { + + public static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + private LoadingCache compiledAvroSchemaCache; + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + final int cacheSize = context.getProperty(CACHE_SIZE).asInteger(); + compiledAvroSchemaCache = Caffeine.newBuilder() + .maximumSize(cacheSize) + .build(schemaText -> new Schema.Parser().parse(schemaText)); + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, + final OutputStream out, final Map variables) throws IOException { + final ParquetConfig parquetConfig = createParquetConfig(getConfigurationContext(), variables); + try { + final Schema avroSchema; + try { + if (recordSchema.getSchemaFormat().isPresent() && recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { + final Optional textOption = recordSchema.getSchemaText(); + if (textOption.isPresent()) { + avroSchema = compiledAvroSchemaCache.get(textOption.get()); + } else { + avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); + } + } else { + avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema); + } + } catch (final Exception e) { + throw new SchemaNotFoundException("Failed to compile Avro Schema", e); + } + + return new WriteParquetResult(avroSchema, out, parquetConfig, logger); + + } catch (final SchemaNotFoundException e) { + throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e); + } + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(CACHE_SIZE); + properties.add(ParquetUtils.COMPRESSION_TYPE); + properties.add(ParquetUtils.ROW_GROUP_SIZE); + properties.add(ParquetUtils.PAGE_SIZE); + properties.add(ParquetUtils.DICTIONARY_PAGE_SIZE); + properties.add(ParquetUtils.MAX_PADDING_SIZE); + properties.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING); + properties.add(ParquetUtils.ENABLE_VALIDATION); + properties.add(ParquetUtils.WRITER_VERSION); + properties.add(ParquetUtils.AVRO_WRITE_OLD_LIST_STRUCTURE); + properties.add(ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS); + return properties; + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java similarity index 97% rename from nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java rename to nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java index 8421e3756e..15c0cd554d 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.parquet.record; +package org.apache.nifi.parquet.hadoop; import org.apache.avro.generic.GenericRecord; import org.apache.nifi.avro.AvroTypeUtil; diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordWriter.java similarity index 97% rename from nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java rename to nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordWriter.java index 7ef37b1d99..caa244c6f5 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordWriter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.parquet.record; +package org.apache.nifi.parquet.hadoop; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java new file mode 100644 index 0000000000..6ff319d6b5 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet.record; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.parquet.stream.NifiParquetInputFile; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.io.InputFile; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +public class ParquetRecordReader implements RecordReader { + + private GenericRecord lastParquetRecord; + private RecordSchema recordSchema; + + private final InputStream inputStream; + private final InputFile inputFile; + private final ParquetReader parquetReader; + + public ParquetRecordReader(final InputStream inputStream, final long inputLength, final Configuration configuration) throws IOException { + if (inputLength < 0) { + throw new IllegalArgumentException("Invalid input length of '" + inputLength + "'. This record reader requires knowing " + + "the length of the InputStream and cannot be used in some cases where the length may not be known."); + } + + this.inputStream = inputStream; + + inputFile = new NifiParquetInputFile(inputStream, inputLength); + parquetReader = AvroParquetReader.builder(inputFile).withConf(configuration).build(); + + // Read the first record so that we can extract the schema + lastParquetRecord = parquetReader.read(); + if (lastParquetRecord == null) { + throw new EOFException("Unable to obtain schema because no records were available"); + } + + // Convert Avro schema to RecordSchema + recordSchema = AvroTypeUtil.createSchema(lastParquetRecord.getSchema()); + } + + @Override + public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException { + // If null then no more records are available + if (lastParquetRecord == null) { + return null; + } + + // Convert the last Parquet GenericRecord to NiFi Record + final Map values = AvroTypeUtil.convertAvroRecordToMap(lastParquetRecord, recordSchema); + final Record record = new MapRecord(recordSchema, values); + + // Read the next record and store for next time + lastParquetRecord = parquetReader.read(); + + // Return the converted record + return record; + } + + @Override + public RecordSchema getSchema() { + return recordSchema; + } + + @Override + public void close() throws IOException { + try { + parquetReader.close(); + } finally { + // ensure the input stream still gets closed + inputStream.close(); + } + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/WriteParquetResult.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/WriteParquetResult.java new file mode 100644 index 0000000000..9f7ce579cd --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/WriteParquetResult.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet.record; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.parquet.stream.NifiParquetOutputFile; +import org.apache.nifi.parquet.utils.ParquetConfig; +import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.record.Record; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.io.OutputFile; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Map; + +import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig; + +public class WriteParquetResult extends AbstractRecordSetWriter { + + private final Schema schema; + private final ParquetWriter parquetWriter; + private final ComponentLog componentLogger; + + public WriteParquetResult(final Schema schema, final OutputStream out, final ParquetConfig parquetConfig, final ComponentLog componentLogger) throws IOException { + super(out); + this.schema = schema; + this.componentLogger = componentLogger; + + final Configuration conf = new Configuration(); + final OutputFile outputFile = new NifiParquetOutputFile(out); + + final AvroParquetWriter.Builder writerBuilder = + AvroParquetWriter.builder(outputFile).withSchema(schema); + applyCommonConfig(writerBuilder, conf, parquetConfig); + parquetWriter = writerBuilder.build(); + } + + @Override + protected Map writeRecord(final Record record) throws IOException { + final GenericRecord genericRecord = AvroTypeUtil.createAvroRecord(record, schema); + parquetWriter.write(genericRecord); + return Collections.emptyMap(); + } + + @Override + public void close() throws IOException { + try { + parquetWriter.close(); + } finally { + // ensure the output stream still gets closed + super.close(); + } + } + + @Override + public String getMimeType() { + return "application/parquet"; + } + +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiOutputStream.java similarity index 97% rename from nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java rename to nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiOutputStream.java index acb2dc46a2..6212795566 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiOutputStream.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nifi.processors.parquet.stream; +package org.apache.nifi.parquet.stream; import org.apache.parquet.io.PositionOutputStream; diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java new file mode 100644 index 0000000000..c4ac722c7b --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetInputFile.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet.stream; + +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; + +import java.io.IOException; +import java.io.InputStream; + +public class NifiParquetInputFile implements InputFile { + + private final long length; + private final ByteCountingInputStream input; + + public NifiParquetInputFile(final InputStream input, final long length) { + if (input == null) { + throw new IllegalArgumentException("InputStream is required"); + } + + if (!input.markSupported()) { + throw new IllegalArgumentException("InputStream must support mark/reset to be used with NifiParquetInputFile"); + } + + this.input = new ByteCountingInputStream(input); + this.length = length; + } + + @Override + public long getLength() throws IOException { + return length; + } + + @Override + public SeekableInputStream newStream() throws IOException { + return new NifiSeekableInputStream(input); + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetOutputFile.java similarity index 96% rename from nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java rename to nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetOutputFile.java index d549b7bf11..2125e81a09 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiParquetOutputFile.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nifi.processors.parquet.stream; +package org.apache.nifi.parquet.stream; import org.apache.parquet.io.OutputFile; import org.apache.parquet.io.PositionOutputStream; diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java new file mode 100644 index 0000000000..cd6b820536 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/stream/NifiSeekableInputStream.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet.stream; + +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.parquet.io.DelegatingSeekableInputStream; + +import java.io.IOException; + +public class NifiSeekableInputStream extends DelegatingSeekableInputStream { + + private final ByteCountingInputStream input; + + public NifiSeekableInputStream(final ByteCountingInputStream input) { + super(input); + this.input = input; + this.input.mark(Integer.MAX_VALUE); + } + + @Override + public long getPos() throws IOException { + return input.getBytesConsumed(); + } + + @Override + public void seek(long newPos) throws IOException { + final long currentPos = getPos(); + if (newPos == currentPos) { + return; + } + + if (newPos < currentPos) { + // seeking backwards so first reset back to beginning of the stream then seek + input.reset(); + input.mark(Integer.MAX_VALUE); + } + + // must call getPos() again in case reset was called above + StreamUtils.skip(input, newPos - getPos()); + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public synchronized void mark(int readlimit) { + throw new UnsupportedOperationException("Mark/reset is not supported"); + } + + @Override + public synchronized void reset() throws IOException { + throw new UnsupportedOperationException("Mark/reset is not supported"); + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java new file mode 100644 index 0000000000..5cc4abe298 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetConfig.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet.utils; + +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class ParquetConfig { + + private Integer rowGroupSize; + private Integer pageSize; + private Integer dictionaryPageSize; + private Integer maxPaddingSize; + private Boolean enableDictionaryEncoding; + private Boolean enableValidation; + private Boolean avroReadCompatibility; + private Boolean avroAddListElementRecords; + private Boolean avroWriteOldListStructure; + private ParquetProperties.WriterVersion writerVersion; + private ParquetFileWriter.Mode writerMode; + private CompressionCodecName compressionCodec; + + public Integer getRowGroupSize() { + return rowGroupSize; + } + + public void setRowGroupSize(Integer rowGroupSize) { + this.rowGroupSize = rowGroupSize; + } + + public Integer getPageSize() { + return pageSize; + } + + public void setPageSize(Integer pageSize) { + this.pageSize = pageSize; + } + + public Integer getDictionaryPageSize() { + return dictionaryPageSize; + } + + public void setDictionaryPageSize(Integer dictionaryPageSize) { + this.dictionaryPageSize = dictionaryPageSize; + } + + public Integer getMaxPaddingSize() { + return maxPaddingSize; + } + + public void setMaxPaddingSize(Integer maxPaddingSize) { + this.maxPaddingSize = maxPaddingSize; + } + + public Boolean getEnableDictionaryEncoding() { + return enableDictionaryEncoding; + } + + public void setEnableDictionaryEncoding(Boolean enableDictionaryEncoding) { + this.enableDictionaryEncoding = enableDictionaryEncoding; + } + + public Boolean getEnableValidation() { + return enableValidation; + } + + public void setEnableValidation(Boolean enableValidation) { + this.enableValidation = enableValidation; + } + + public Boolean getAvroReadCompatibility() { + return avroReadCompatibility; + } + + public void setAvroReadCompatibility(Boolean avroReadCompatibility) { + this.avroReadCompatibility = avroReadCompatibility; + } + + public Boolean getAvroAddListElementRecords() { + return avroAddListElementRecords; + } + + public void setAvroAddListElementRecords(Boolean avroAddListElementRecords) { + this.avroAddListElementRecords = avroAddListElementRecords; + } + + public Boolean getAvroWriteOldListStructure() { + return avroWriteOldListStructure; + } + + public void setAvroWriteOldListStructure(Boolean avroWriteOldListStructure) { + this.avroWriteOldListStructure = avroWriteOldListStructure; + } + + public ParquetProperties.WriterVersion getWriterVersion() { + return writerVersion; + } + + public void setWriterVersion(ParquetProperties.WriterVersion writerVersion) { + this.writerVersion = writerVersion; + } + + public ParquetFileWriter.Mode getWriterMode() { + return writerMode; + } + + public void setWriterMode(ParquetFileWriter.Mode writerMode) { + this.writerMode = writerMode; + } + + public CompressionCodecName getCompressionCodec() { + return compressionCodec; + } + + public void setCompressionCodec(CompressionCodecName compressionCodec) { + this.compressionCodec = compressionCodec; + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java similarity index 55% rename from nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java rename to nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java index 7b116c233e..0f00df8e99 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/utils/ParquetUtils.java @@ -16,25 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.nifi.processors.parquet.utils; +package org.apache.nifi.parquet.utils; import org.apache.hadoop.conf.Configuration; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.parquet.PutParquet; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.avro.AvroWriteSupport; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; + import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; public class ParquetUtils { @@ -96,18 +99,36 @@ public class ParquetUtils { .allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values()) .build(); - public static List COMPRESSION_TYPES = getCompressionTypes(); - - public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() - .name("compression-type") - .displayName("Compression Type") - .description("The type of compression for the file being written.") - .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0])) - .defaultValue(COMPRESSION_TYPES.get(0).getValue()) + public static final PropertyDescriptor AVRO_READ_COMPATIBILITY = new PropertyDescriptor.Builder() + .name("avro-read-compatibility") + .displayName("Avro Read Compatibility") + .description("Specifies the value for '" + AvroReadSupport.AVRO_COMPATIBILITY + "' in the underlying Parquet library") + .allowableValues("true", "false") + .defaultValue("true") .required(true) .build(); - public static List getCompressionTypes() { + public static final PropertyDescriptor AVRO_ADD_LIST_ELEMENT_RECORDS = new PropertyDescriptor.Builder() + .name("avro-add-list-element-records") + .displayName("Avro Add List Element Records") + .description("Specifies the value for '" + AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS + "' in the underlying Parquet library") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + public static final PropertyDescriptor AVRO_WRITE_OLD_LIST_STRUCTURE = new PropertyDescriptor.Builder() + .name("avro-write-old-list-structure") + .displayName("Avro Write Old List Structure") + .description("Specifies the value for '" + AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE + "' in the underlying Parquet library") + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + public static final List COMPRESSION_TYPES = getCompressionTypes(); + + private static List getCompressionTypes() { final List compressionTypes = new ArrayList<>(); for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) { final String name = compressionCodecName.name(); @@ -116,35 +137,48 @@ public class ParquetUtils { return Collections.unmodifiableList(compressionTypes); } - public static void applyCommonConfig(final ParquetWriter.Builder builder, - final ProcessContext context, - final FlowFile flowFile, - final Configuration conf, - final AbstractProcessor abstractProcessor) { - builder.withConf(conf); + // NOTE: This needs to be named the same as the compression property in AbstractPutHDFSRecord + public static final String COMPRESSION_TYPE_PROP_NAME = "compression-type"; + + public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder() + .name(COMPRESSION_TYPE_PROP_NAME) + .displayName("Compression Type") + .description("The type of compression for the file being written.") + .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0])) + .defaultValue(COMPRESSION_TYPES.get(0).getValue()) + .required(true) + .build(); + + /** + * Creates a ParquetConfig instance from the given PropertyContext. + * + * @param context the PropertyContext from a component + * @param variables an optional set of variables to evaluate EL against, may be null + * @return the ParquetConfig + */ + public static ParquetConfig createParquetConfig(final PropertyContext context, final Map variables) { + final ParquetConfig parquetConfig = new ParquetConfig(); // Required properties boolean overwrite = true; - if(context.getProperty(PutParquet.OVERWRITE).isSet()) + if(context.getProperty(PutParquet.OVERWRITE).isSet()) { overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean(); + } final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE; - builder.withWriteMode(mode); - - final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName()); - - final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue(); + parquetConfig.setWriterMode(mode); + final String compressionTypeValue = context.getProperty(ParquetUtils.COMPRESSION_TYPE).getValue(); final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue); - builder.withCompressionCodec(codecName); + parquetConfig.setCompressionCodec(codecName); // Optional properties if (context.getProperty(ROW_GROUP_SIZE).isSet()){ try { - final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B); if (rowGroupSize != null) { - builder.withRowGroupSize(rowGroupSize.intValue()); + parquetConfig.setRowGroupSize(rowGroupSize.intValue()); } } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e); @@ -153,9 +187,9 @@ public class ParquetUtils { if (context.getProperty(PAGE_SIZE).isSet()) { try { - final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B); if (pageSize != null) { - builder.withPageSize(pageSize.intValue()); + parquetConfig.setPageSize(pageSize.intValue()); } } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e); @@ -164,9 +198,9 @@ public class ParquetUtils { if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) { try { - final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B); if (dictionaryPageSize != null) { - builder.withDictionaryPageSize(dictionaryPageSize.intValue()); + parquetConfig.setDictionaryPageSize(dictionaryPageSize.intValue()); } } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e); @@ -175,9 +209,9 @@ public class ParquetUtils { if (context.getProperty(MAX_PADDING_SIZE).isSet()) { try { - final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B); + final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B); if (maxPaddingSize != null) { - builder.withMaxPaddingSize(maxPaddingSize.intValue()); + parquetConfig.setMaxPaddingSize(maxPaddingSize.intValue()); } } catch (IllegalArgumentException e) { throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e); @@ -186,17 +220,93 @@ public class ParquetUtils { if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) { final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean(); - builder.withDictionaryEncoding(enableDictionaryEncoding); + parquetConfig.setEnableDictionaryEncoding(enableDictionaryEncoding); } if (context.getProperty(ENABLE_VALIDATION).isSet()) { final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean(); - builder.withValidation(enableValidation); + parquetConfig.setEnableValidation(enableValidation); } if (context.getProperty(WRITER_VERSION).isSet()) { final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue(); - builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue)); + parquetConfig.setWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue)); + } + + if (context.getProperty(AVRO_READ_COMPATIBILITY).isSet()) { + final boolean avroReadCompatibility = context.getProperty(AVRO_READ_COMPATIBILITY).asBoolean(); + parquetConfig.setAvroReadCompatibility(avroReadCompatibility); + } + + if (context.getProperty(AVRO_ADD_LIST_ELEMENT_RECORDS).isSet()) { + final boolean avroAddListElementRecords = context.getProperty(AVRO_ADD_LIST_ELEMENT_RECORDS).asBoolean(); + parquetConfig.setAvroAddListElementRecords(avroAddListElementRecords); + } + + if (context.getProperty(AVRO_WRITE_OLD_LIST_STRUCTURE).isSet()) { + final boolean avroWriteOldListStructure = context.getProperty(AVRO_WRITE_OLD_LIST_STRUCTURE).asBoolean(); + parquetConfig.setAvroWriteOldListStructure(avroWriteOldListStructure); + } + + return parquetConfig; + } + + public static void applyCommonConfig(final ParquetWriter.Builder builder, final Configuration conf, + final ParquetConfig parquetConfig) { + builder.withConf(conf); + builder.withCompressionCodec(parquetConfig.getCompressionCodec()); + + // Optional properties + + if (parquetConfig.getRowGroupSize() != null){ + builder.withRowGroupSize(parquetConfig.getRowGroupSize()); + } + + if (parquetConfig.getPageSize() != null) { + builder.withPageSize(parquetConfig.getPageSize()); + } + + if (parquetConfig.getDictionaryPageSize() != null) { + builder.withDictionaryPageSize(parquetConfig.getDictionaryPageSize()); + } + + if (parquetConfig.getMaxPaddingSize() != null) { + builder.withMaxPaddingSize(parquetConfig.getMaxPaddingSize()); + } + + if (parquetConfig.getEnableDictionaryEncoding() != null) { + builder.withDictionaryEncoding(parquetConfig.getEnableDictionaryEncoding()); + } + + if (parquetConfig.getEnableValidation() != null) { + builder.withValidation(parquetConfig.getEnableValidation()); + } + + if (parquetConfig.getWriterVersion() != null) { + builder.withWriterVersion(parquetConfig.getWriterVersion()); + } + + if (parquetConfig.getWriterMode() != null) { + builder.withWriteMode(parquetConfig.getWriterMode()); + } + + applyCommonConfig(conf, parquetConfig); + } + + public static void applyCommonConfig(Configuration conf, ParquetConfig parquetConfig) { + if (parquetConfig.getAvroReadCompatibility() != null) { + conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, + parquetConfig.getAvroReadCompatibility().booleanValue()); + } + + if (parquetConfig.getAvroAddListElementRecords() != null) { + conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, + parquetConfig.getAvroAddListElementRecords().booleanValue()); + } + + if (parquetConfig.getAvroWriteOldListStructure() != null) { + conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE, + parquetConfig.getAvroWriteOldListStructure().booleanValue()); } } } diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java index 0bcf606f22..eec48afd0e 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java @@ -33,29 +33,33 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.parquet.stream.NifiParquetOutputFile; +import org.apache.nifi.parquet.utils.ParquetConfig; +import org.apache.nifi.parquet.utils.ParquetUtils; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile; -import org.apache.nifi.processors.parquet.utils.ParquetUtils; import org.apache.parquet.avro.AvroParquetWriter; -import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.hadoop.ParquetWriter; -import java.io.InputStream; -import java.io.OutputStream; + import java.io.BufferedInputStream; import java.io.IOException; -import java.util.Map; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.ArrayList; +import java.util.Map; import java.util.Set; -import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig; +import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig; + @Tags({"avro", "parquet", "convert"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does " @@ -191,12 +195,13 @@ public class ConvertAvroToParquet extends AbstractProcessor { .builder(nifiParquetOutputFile) .withSchema(schema); - Configuration conf = new Configuration(); - conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true); - conf.setBoolean("parquet.avro.add-list-element-records", false); - conf.setBoolean("parquet.avro.write-old-list-structure", false); + final ParquetConfig parquetConfig = createParquetConfig(context, flowFile.getAttributes()); + parquetConfig.setAvroReadCompatibility(true); + parquetConfig.setAvroAddListElementRecords(false); + parquetConfig.setAvroWriteOldListStructure(false); - ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this); + final Configuration conf = new Configuration(); + applyCommonConfig(parquetWriter, conf, parquetConfig); return parquetWriter.build(); } diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java index f4a6875327..aa1eabd654 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java @@ -33,7 +33,7 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord; import org.apache.nifi.processors.hadoop.record.HDFSRecordReader; -import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader; +import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordReader; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.hadoop.ParquetReader; import java.io.IOException; diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java index c2794ac039..d7d55c227b 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java @@ -33,13 +33,14 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.parquet.utils.ParquetConfig; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; -import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter; -import org.apache.nifi.processors.parquet.utils.ParquetUtils; +import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordWriter; +import org.apache.nifi.parquet.utils.ParquetUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.parquet.avro.AvroParquetWriter; @@ -49,6 +50,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig; +import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig; + @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) @Tags({"put", "parquet", "hadoop", "HDFS", "filesystem", "record"}) @CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " + @@ -79,19 +83,9 @@ public class PutParquet extends AbstractPutHDFSRecord { .defaultValue("false") .build(); - public static final List COMPRESSION_TYPES; - static { - final List compressionTypes = new ArrayList<>(); - for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) { - final String name = compressionCodecName.name(); - compressionTypes.add(new AllowableValue(name, name)); - } - COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes); - } - @Override public List getCompressionTypes(final ProcessorInitializationContext context) { - return COMPRESSION_TYPES; + return ParquetUtils.COMPRESSION_TYPES; } @Override @@ -109,6 +103,8 @@ public class PutParquet extends AbstractPutHDFSRecord { props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING); props.add(ParquetUtils.ENABLE_VALIDATION); props.add(ParquetUtils.WRITER_VERSION); + props.add(ParquetUtils.AVRO_WRITE_OLD_LIST_STRUCTURE); + props.add(ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS); props.add(REMOVE_CRC_FILES); return Collections.unmodifiableList(props); } @@ -123,7 +119,8 @@ public class PutParquet extends AbstractPutHDFSRecord { .builder(path) .withSchema(avroSchema); - ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this); + final ParquetConfig parquetConfig = createParquetConfig(context, flowFile.getAttributes()); + applyCommonConfig(parquetWriter, conf, parquetConfig); return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema); } diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..6b009e1d57 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.parquet.ParquetRecordSetWriter +org.apache.nifi.parquet.ParquetReader \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java new file mode 100644 index 0000000000..da5424217d --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetReader.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestParquetReader { + + private Map readerFactoryProperties; + private ConfigurationContext readerFactoryConfigContext; + + private ParquetReader parquetReaderFactory; + private ComponentLog componentLog; + + @Before + public void setup() { + readerFactoryProperties = new HashMap<>(); + readerFactoryConfigContext = new MockConfigurationContext(readerFactoryProperties, null); + + parquetReaderFactory = new ParquetReader(); + parquetReaderFactory.abstractStoreConfigContext(readerFactoryConfigContext); + + componentLog = new MockComponentLog("1234", parquetReaderFactory); + } + + @Test + public void testReadUsers() throws IOException, MalformedRecordException { + final Schema schema = getSchema("src/test/resources/avro/user.avsc"); + final File parquetFile = new File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis()); + + // write some users to the parquet file... + final int numUsers = 10; + try (final ParquetWriter writer = createParquetWriter(schema, parquetFile)) { + for (int i=0; i < numUsers; i++) { + final GenericRecord user = new GenericData.Record(schema); + user.put("name", "Bob" + i); + user.put("favorite_number", i); + user.put("favorite_color", "blue" + i); + writer.write(user); + } + } + + // read the parquet file into bytes since we can't use a FileInputStream since it doesn't support mark/reset + final byte[] parquetBytes = IOUtils.toByteArray(parquetFile.toURI()); + + // read the users in using the record reader... + try (final InputStream in = new ByteArrayInputStream(parquetBytes); + final RecordReader recordReader = parquetReaderFactory.createRecordReader( + Collections.emptyMap(), in, parquetFile.length(), componentLog)) { + + int recordCount = 0; + while (recordReader.nextRecord() != null) { + recordCount++; + } + assertEquals(numUsers, recordCount); + } + } + + private Schema getSchema(final String schemaFilePath) throws IOException { + final File schemaFile = new File(schemaFilePath); + final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8); + return new Schema.Parser().parse(schemaString); + } + + private ParquetWriter createParquetWriter(final Schema schema, final File parquetFile) throws IOException { + final Configuration conf = new Configuration(); + final Path parquetPath = new Path(parquetFile.getPath()); + + final ParquetWriter writer = + AvroParquetWriter.builder(parquetPath) + .withSchema(schema) + .withConf(conf) + .build(); + + return writer; + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetRecordSetWriter.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetRecordSetWriter.java new file mode 100644 index 0000000000..637ba227f6 --- /dev/null +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/TestParquetRecordSetWriter.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.parquet; + +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.util.MockComponentLog; +import org.apache.nifi.util.MockConfigurationContext; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestParquetRecordSetWriter { + + private ComponentLog componentLog; + private ParquetRecordSetWriter recordSetWriterFactory; + + @Before + public void setup() { + recordSetWriterFactory = new ParquetRecordSetWriter(); + componentLog = new MockComponentLog("1234", recordSetWriterFactory); + } + + @Test + public void testWriteUsers() throws IOException, SchemaNotFoundException { + initRecordSetWriter("src/test/resources/avro/user.avsc"); + + // get the schema from the writer factory + final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null); + + // write some records + final int numUsers = 10; + final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis()); + + // write some records... + writeUsers(writeSchema, parquetFile, numUsers); + + // read the records back in to verify + verifyParquetRecords(parquetFile, numUsers); + } + + @Test + public void testWriteUsersWhenSchemaFormatNotAvro() throws IOException, SchemaNotFoundException { + initRecordSetWriter("src/test/resources/avro/user.avsc"); + + // get the schema from the writer factory + final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null); + final RecordSchema writeSchemaWithOtherFormat = new SimpleRecordSchema(writeSchema.getFields(), null, "OTHER-FORMAT", SchemaIdentifier.EMPTY); + + // write some records + final int numUsers = 10; + final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis()); + + // write some records... + writeUsers(writeSchemaWithOtherFormat, parquetFile, numUsers); + + // read the records back in to verify + verifyParquetRecords(parquetFile, numUsers); + } + + private void initRecordSetWriter(final String schemaFile) throws IOException { + final ConfigurationContext configurationContext = getConfigurationContextWithSchema(schemaFile); + + // simulate enabling the service + recordSetWriterFactory.onEnabled(configurationContext); + recordSetWriterFactory.storeSchemaWriteStrategy(configurationContext); + recordSetWriterFactory.storeSchemaAccessStrategy(configurationContext); + } + + private void writeUsers(final RecordSchema writeSchema, final File parquetFile, final int numUsers) throws IOException { + try(final OutputStream output = new FileOutputStream(parquetFile); + final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(componentLog, writeSchema, output, Collections.emptyMap())) { + for (int i = 0; i < numUsers; i++) { + final Map userFields = new HashMap<>(); + userFields.put("name", "user" + i); + userFields.put("favorite_number", i); + userFields.put("favorite_color", "blue"); + + final Record userRecord = new MapRecord(writeSchema, userFields); + recordSetWriter.write(userRecord); + } + + recordSetWriter.flush(); + } + } + + private void verifyParquetRecords(final File parquetFile, final int expectedRecordCount) throws IOException { + final Configuration conf = new Configuration(); + final Path path = new Path(parquetFile.getPath()); + final InputFile inputFile = HadoopInputFile.fromPath(path, conf); + + try (final ParquetReader reader = + AvroParquetReader.builder(inputFile).withConf(conf).build()){ + + int recordCount = 0; + while(reader.read() != null) { + recordCount++; + } + assertEquals(expectedRecordCount, recordCount); + } + } + + private ConfigurationContext getConfigurationContextWithSchema(String schemaPath) throws IOException { + final File schemaFile = new File(schemaPath); + final Map properties = createPropertiesWithSchema(schemaFile); + return new MockConfigurationContext(properties, null); + } + + private Map createPropertiesWithSchema(final File schemaFile) throws IOException { + return createPropertiesWithSchema(IOUtils.toString(schemaFile.toURI())); + } + + private Map createPropertiesWithSchema(final String schemaText) { + final Map propertyValues = new HashMap<>(); + propertyValues.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue()); + propertyValues.put(SchemaAccessUtils.SCHEMA_TEXT, schemaText); + return propertyValues; + } +} diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java index 84ff4345e5..c1bcbc7aec 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java @@ -42,7 +42,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processors.hadoop.exception.FailureException; import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter; -import org.apache.nifi.processors.parquet.utils.ParquetUtils; +import org.apache.nifi.parquet.utils.ParquetUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java index 5983f183ee..ac9f2b7722 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java @@ -60,10 +60,10 @@ public class ScriptedReader extends AbstractScriptedRecordFactory variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { if (recordFactory.get() != null) { try { - return recordFactory.get().createRecordReader(variables, in, logger); + return recordFactory.get().createRecordReader(variables, in, inputLength, logger); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy index 6142ddd7cf..8be0b6f2d6 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy @@ -97,9 +97,10 @@ class ScriptedReaderTest { recordReaderFactory.initialize initContext recordReaderFactory.onEnabled configurationContext - InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) + byte[] contentBytes = 'Flow file content not used'.bytes + InputStream inStream = new ByteArrayInputStream(contentBytes) - RecordReader recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, logger) + RecordReader recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, contentBytes.length, logger) assertNotNull(recordReader) 3.times { @@ -157,7 +158,7 @@ class ScriptedReaderTest { Map schemaVariables = ['record.tag': 'myRecord'] - InputStream inStream = new ByteArrayInputStream(''' + byte[] contentBytes = ''' 1 @@ -175,9 +176,11 @@ class ScriptedReaderTest { 300 - '''.bytes) + '''.bytes - RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, logger) + InputStream inStream = new ByteArrayInputStream(contentBytes) + + RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, contentBytes.length, logger) assertNotNull(recordReader) 3.times { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy index 83e772062f..783f5202a5 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy @@ -57,7 +57,7 @@ class GroovyRecordReader implements RecordReader { class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { - RecordReader createRecordReader(Map variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + RecordReader createRecordReader(Map variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { return new GroovyRecordReader() } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy index c7296099ad..db2c03b079 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy @@ -67,7 +67,7 @@ class GroovyXmlRecordReaderFactory extends AbstractControllerService implements // Will be set by the ScriptedRecordReaderFactory ConfigurationContext configurationContext - RecordReader createRecordReader(Map variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + RecordReader createRecordReader(Map variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { // Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue() if (!schemaText) return null diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java index f34f58f836..17e69277de 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -123,7 +123,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) { // Get the first record and process it before we create the Record Writer. We do this so that if the Processor // updates the Record's schema, we can provide an updated schema to the Record Writer. If there are no records, diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java index 46acf98373..1a31d90eae 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java @@ -121,7 +121,7 @@ public abstract class AbstractRouteRecord extends AbstractProcessor { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) { final Record firstRecord = reader.nextRecord(); if (firstRecord == null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java index 1b719abea0..5d69ea6136 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ForkRecord.java @@ -17,20 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -73,6 +59,20 @@ import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + @SideEffectFree @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -238,7 +238,7 @@ public class ForkRecord extends AbstractProcessor { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) { final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); final OutputStream out = session.write(outFlowFile); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java index 5ea3e6f324..c6b6af9ab1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -334,10 +334,10 @@ public class ListenTCPRecord extends AbstractProcessor { synchronized (socketRecordReader) { FlowFile flowFile = session.create(); try { - // lazily creating the record reader here b/c we need a flow file, eventually shouldn't have to do this + // lazily creating the record reader here RecordReader recordReader = socketRecordReader.getRecordReader(); if (recordReader == null) { - recordReader = socketRecordReader.createRecordReader(flowFile, getLogger()); + recordReader = socketRecordReader.createRecordReader(getLogger()); } Record record; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java index d331a332d4..6a723ea5cc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenUDPRecord.java @@ -242,7 +242,8 @@ public class ListenUDPRecord extends AbstractListenEventProcessor final RecordReader reader; final List records = new ArrayList<>(); try (final InputStream in = new ByteArrayInputStream(event.getData())) { - reader = readerFactory.createRecordReader(Collections.emptyMap(), in, getLogger()); + final long inputLength = event.getData() == null ? -1 : event.getData().length; + reader = readerFactory.createRecordReader(Collections.emptyMap(), in, inputLength, getLogger()); Record record; while((record = reader.nextRecord()) != null) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java index 6c0ef0844e..019849ce20 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -17,22 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; -import java.util.stream.Stream; - import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -68,6 +52,22 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + @EventDriven @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -200,7 +200,7 @@ public class PartitionRecord extends AbstractProcessor { try (final InputStream in = session.read(flowFile)) { final Map originalAttributes = flowFile.getAttributes(); - final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger()); + final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger()); final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 514d0438b2..5136e67d97 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -286,7 +286,7 @@ public class QueryRecord extends AbstractProcessor { final RecordSchema readerSchema; try (final InputStream rawIn = session.read(original)) { final Map originalAttributes = original.getAttributes(); - final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger()); + final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, original.getSize(), getLogger()); readerSchema = reader.getSchema(); writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java index 57f1fbd4b2..b237a0b94b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -17,17 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; - import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -61,6 +50,17 @@ import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + @EventDriven @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @@ -153,7 +153,7 @@ public class SplitRecord extends AbstractProcessor { session.read(original, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) { final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java index 562b8f04b0..70ec39f0fb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestForkRecord.java @@ -17,15 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.json.JsonRecordSetWriter; @@ -49,6 +40,15 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class TestForkRecord { private final String dateFormat = RecordFieldType.DATE.getDefaultFormat(); @@ -461,7 +461,8 @@ public class TestForkRecord { } @Override - public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) + throws MalformedRecordException, IOException, SchemaNotFoundException { return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java index 52b7968da6..932f6d1181 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java @@ -17,14 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Optional; - import org.apache.nifi.avro.AvroReader; import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.csv.CSVReader; @@ -48,6 +40,14 @@ import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Optional; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -257,9 +257,10 @@ public class TestValidateRecord { runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); runner.enableControllerService(avroReader); final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0); + final byte[] validFlowFileBytes = validFlowFile.toByteArray(); try ( - final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFile.toByteArray()); - final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, runner.getLogger()); + final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFileBytes); + final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, validFlowFileBytes.length, runner.getLogger()); ) { final RecordSchema resultSchema = recordReader.getSchema(); assertEquals(3, resultSchema.getFieldCount()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java index 58387efc42..ed8fc50d26 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/RestLookupService.java @@ -302,7 +302,7 @@ public class RestLookupService extends AbstractControllerService implements Reco final Record record; try (final InputStream is = responseBody.byteStream(); final InputStream bufferedIn = new BufferedInputStream(is)) { - record = handleResponse(bufferedIn, context); + record = handleResponse(bufferedIn, responseBody.contentLength(), context); } return Optional.ofNullable(record); @@ -342,9 +342,9 @@ public class RestLookupService extends AbstractControllerService implements Reco return client.newCall(request).execute(); } - private Record handleResponse(InputStream is, Map context) throws SchemaNotFoundException, MalformedRecordException, IOException { + private Record handleResponse(InputStream is, long inputLength, Map context) throws SchemaNotFoundException, MalformedRecordException, IOException { - try (RecordReader reader = readerFactory.createRecordReader(context, is, getLogger())) { + try (RecordReader reader = readerFactory.createRecordReader(context, is, inputLength, getLogger())) { Record record = reader.nextRecord(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java index cb42fed38e..c743ba6d9d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java @@ -37,7 +37,7 @@ public interface RecordReaderFactory extends ControllerService { /** * Create a RecordReader instance to read records from specified InputStream. - * This method calls {@link #createRecordReader(Map, InputStream, ComponentLog)} with Attributes of the specified FlowFile. + * This method calls {@link #createRecordReader(Map, InputStream, long, ComponentLog)} with Attributes of the specified FlowFile. * @param flowFile Attributes of this FlowFile are used to resolve Record Schema via Expression Language dynamically. This can be null. * * @param in InputStream containing Records. @@ -46,7 +46,7 @@ public interface RecordReaderFactory extends ControllerService { * @return Created RecordReader instance */ default RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { - return createRecordReader(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(), in, logger); + return createRecordReader(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(), in, flowFile == null ? -1 : flowFile.getSize(), logger); } /** @@ -68,10 +68,11 @@ public interface RecordReaderFactory extends ControllerService { * @param variables A map containing variables which is used to resolve the Record Schema dynamically via Expression Language. * This can be null or empty. * @param in InputStream containing Records. + * @param inputLength the length of the content to read from the InputStream in bytes, a negative value indicates an unknown or unbound size * @param logger A logger bound to a component * * @return Created RecordReader instance */ - RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException; + RecordReader createRecordReader(Map variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index 11379b97df..aae5b5961d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -94,7 +94,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue(); if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { return new AvroReaderWithEmbeddedSchema(in); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index b1a840a6cb..b71971165d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -139,7 +139,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header. in.mark(1024 * 1024); final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 099da2a102..ff24211621 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -267,7 +267,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java index a59bc19325..f16106d61e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -162,7 +162,8 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) + throws IOException, MalformedRecordException, SchemaNotFoundException { final InputStream bufferedIn = new BufferedInputStream(in); final RecordSchema schema = getSchema(variables, bufferedIn, null); return new JsonPathRowRecordReader(jsonPaths, schema, bufferedIn, logger, dateFormat, timeFormat, timestampFormat); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index c80542120d..d00799c834 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -105,7 +105,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) + throws IOException, MalformedRecordException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java index 7f29db2d2b..89061eb608 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/Syslog5424Reader.java @@ -143,7 +143,7 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); return new Syslog5424RecordReader(parser, in, schema); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogReader.java index 9b7eda93c0..2dd2b03926 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/syslog/SyslogReader.java @@ -136,7 +136,7 @@ public class SyslogReader extends SchemaRegistryService implements RecordReaderF } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); return new SyslogRecordReader(parser, in, schema); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java index d96d397f91..2f800c390f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java @@ -160,7 +160,8 @@ public class XMLReader extends SchemaRegistryService implements RecordReaderFact } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException, MalformedRecordException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) + throws IOException, SchemaNotFoundException, MalformedRecordException { final ConfigurationContext context = getConfigurationContext(); final RecordSchema schema = getSchema(variables, in, null);