NIFI-6089 Add Parquet record reader and writer

NIFI-5755 Allow PutParquet processor to specify avro write configuration
Review feedback
Additional review feedback

This closes #3679

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Bryan Bende 2019-08-23 16:15:34 -04:00 committed by Mike Thomsen
parent 9071e5baa7
commit 76c2c3fee2
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
69 changed files with 1340 additions and 315 deletions

View File

@ -48,6 +48,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@ -74,7 +75,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import static org.apache.nifi.util.StringUtils.isEmpty;
@ -454,9 +454,9 @@ public class ConsumeAzureEventHub extends AbstractSessionFactoryProcessor {
try (final OutputStream out = session.write(flowFile)) {
for (final EventData eventData : messages) {
try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger);
final byte[] eventDataBytes = eventData.getBytes();
try (final InputStream in = new ByteArrayInputStream(eventDataBytes)) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, eventDataBytes.length, logger);
Record record;
while ((record = reader.nextRecord()) != null) {

View File

@ -59,6 +59,7 @@ import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
@ -204,7 +205,7 @@ public class TestConsumeAzureEventHub {
final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class);
processor.setReaderFactory(readerFactory);
final RecordReader reader = mock(RecordReader.class);
when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader);
when(readerFactory.createRecordReader(anyMap(), any(), anyLong(), any())).thenReturn(reader);
final List<Record> recordList = eventDataList.stream()
.map(eventData -> toRecord(new String(eventData.getBytes())))
.collect(Collectors.toList());

View File

@ -114,7 +114,7 @@ public class CouchbaseRecordLookupService extends AbstractCouchbaseLookupService
recordReaderVariables.put(k, value.toString());
}
});
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, getLogger()));
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, -1, getLogger()));
} catch (Exception e) {
return new Tuple<>(e, null);
}

View File

@ -49,6 +49,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>

View File

@ -36,7 +36,7 @@ public class ArrayListRecordReader extends AbstractControllerService implements
}
@Override
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) {
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) {
return new ArrayListReader(records, schema);
}

View File

@ -17,6 +17,14 @@
package org.apache.nifi.serialization.record;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -26,14 +34,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
private int failAfterN;
private int recordCount = 0;
@ -51,7 +51,7 @@ public class CommaSeparatedRecordReader extends AbstractControllerService implem
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
final List<RecordField> fields = new ArrayList<>();

View File

@ -17,14 +17,6 @@
package org.apache.nifi.serialization.record;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -33,6 +25,14 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
@ -67,7 +67,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final Iterator<Object[]> itr = records.iterator();
return new RecordReader() {

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
@ -30,6 +29,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;
/**
* Encapsulates an SSLSocketChannel and a RecordReader created for the given channel.
@ -54,14 +54,14 @@ public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}
final InputStream socketIn = new SSLSocketChannelInputStream(sslSocketChannel);
final InputStream in = new BufferedInputStream(socketIn);
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@ -32,18 +31,14 @@ import java.net.InetAddress;
public interface SocketChannelRecordReader extends Closeable {
/**
* Currently a RecordReader can only be created with a FlowFile. Since we won't have a FlowFile at the time
* a connection is accepted, this method will be used to lazily create the RecordReader later. Eventually this
* method should be removed and the reader should be passed in through the constructor.
* Lazily creates the RecordReader.
*
*
* @param flowFile the flow file we are creating the reader for
* @param logger the logger of the component creating the reader
* @return a RecordReader
*
* @throws IllegalStateException if create is called after a reader has already been created
*/
RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;
RecordReader createRecordReader(ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;
/**
* @return the RecordReader created by calling createRecordReader, or null if one has not been created yet

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@ -27,6 +26,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;
/**
* Encapsulates a SocketChannel and a RecordReader created for the given channel.
@ -48,13 +48,13 @@ public class StandardSocketChannelRecordReader implements SocketChannelRecordRea
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}
final InputStream in = socketChannel.socket().getInputStream();
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

View File

@ -171,11 +171,11 @@ public class TestPutHive3Streaming {
runner.setProperty(PutHive3Streaming.HIVE_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
MockRecordParser readerFactory = new MockRecordParser() {
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
if (failOnCreateReader) {
throw new SchemaNotFoundException("test");
}
return super.createRecordReader(variables, in, logger);
return super.createRecordReader(variables, in, inputLength, logger);
}
};
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);

View File

@ -16,29 +16,6 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.xml.bind.DatatypeConverter;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -60,6 +37,28 @@ import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_PARSE_FAILURE;
import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.REL_SUCCESS;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
* intended to be obtained from a ConsumerPool. The lease is closeable to allow
@ -471,7 +470,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final Record firstRecord;
try {
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger);
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, recordBytes.length, logger);
firstRecord = reader.nextRecord();
} catch (final Exception e) {
handleParseFailure.accept(consumerRecord, e);

View File

@ -330,7 +330,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
@Override
public void process(final InputStream in) throws IOException {
try {
final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger());
final RecordReader reader = readerFactory.createRecordReader(attributes, in, flowFile.getSize(), getLogger());
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema());

View File

@ -17,22 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -57,6 +41,22 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestPublisherLease {
private ComponentLog logger;
@ -270,7 +270,7 @@ public class TestPublisherLease {
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

View File

@ -17,15 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -40,6 +31,15 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
return new RecordReader() {

View File

@ -520,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordReader reader;
try {
reader = readerFactory.createRecordReader(attributes, in, logger);
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException e) {
yield();
rollback(topicPartition);

View File

@ -17,21 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -56,6 +41,21 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestPublisherLease {
private ComponentLog logger;
@ -266,7 +266,7 @@ public class TestPublisherLease {
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

View File

@ -17,15 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -40,6 +31,15 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
return new RecordReader() {

View File

@ -520,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordReader reader;
try {
reader = readerFactory.createRecordReader(attributes, in, logger);
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException e) {
yield();
rollback(topicPartition);

View File

@ -17,21 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@ -56,6 +41,21 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class TestPublisherLease {
private ComponentLog logger;
@ -266,7 +266,7 @@ public class TestPublisherLease {
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

View File

@ -17,15 +17,6 @@
package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@ -40,6 +31,15 @@ import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
return new RecordReader() {

View File

@ -520,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordReader reader;
try {
reader = readerFactory.createRecordReader(attributes, in, logger);
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException e) {
yield();
rollback(topicPartition);

View File

@ -22,8 +22,6 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.kafka.pubsub.PublishResult;
import org.apache.nifi.processors.kafka.pubsub.PublisherLease;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
@ -267,7 +265,7 @@ public class TestPublisherLease {
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), logger);
final RecordReader reader = readerService.createRecordReader(Collections.emptyMap(), new ByteArrayInputStream(exampleInput), -1, logger);
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = reader.getSchema();

View File

@ -63,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));
return new RecordReader() {

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.record.ParquetRecordReader;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
@Tags({"parquet", "parse", "record", "row", "reader"})
@CapabilityDescription("Parses Parquet data and returns each Parquet record as a separate Record object. " +
"The schema will come from the Parquet data itself.")
public class ParquetReader extends AbstractControllerService implements RecordReaderFactory {
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException {
final Configuration conf = new Configuration();
final ParquetConfig parquetConfig = createParquetConfig(getConfigurationContext(), variables);
applyCommonConfig(conf, parquetConfig);
return new ParquetRecordReader(in, inputLength, conf);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(ParquetUtils.AVRO_READ_COMPATIBILITY);
return properties;
}
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.record.WriteParquetResult;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
@Tags({"parquet", "result", "set", "writer", "serializer", "record", "recordset", "row"})
@CapabilityDescription("Writes the contents of a RecordSet in Parquet format.")
public class ParquetRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
public static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
private LoadingCache<String, Schema> compiledAvroSchemaCache;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
compiledAvroSchemaCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.build(schemaText -> new Schema.Parser().parse(schemaText));
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema,
final OutputStream out, final Map<String, String> variables) throws IOException {
final ParquetConfig parquetConfig = createParquetConfig(getConfigurationContext(), variables);
try {
final Schema avroSchema;
try {
if (recordSchema.getSchemaFormat().isPresent() && recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
final Optional<String> textOption = recordSchema.getSchemaText();
if (textOption.isPresent()) {
avroSchema = compiledAvroSchemaCache.get(textOption.get());
} else {
avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
}
} else {
avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
}
} catch (final Exception e) {
throw new SchemaNotFoundException("Failed to compile Avro Schema", e);
}
return new WriteParquetResult(avroSchema, out, parquetConfig, logger);
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(CACHE_SIZE);
properties.add(ParquetUtils.COMPRESSION_TYPE);
properties.add(ParquetUtils.ROW_GROUP_SIZE);
properties.add(ParquetUtils.PAGE_SIZE);
properties.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
properties.add(ParquetUtils.MAX_PADDING_SIZE);
properties.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
properties.add(ParquetUtils.ENABLE_VALIDATION);
properties.add(ParquetUtils.WRITER_VERSION);
properties.add(ParquetUtils.AVRO_WRITE_OLD_LIST_STRUCTURE);
properties.add(ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS);
return properties;
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.parquet.record;
package org.apache.nifi.parquet.hadoop;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.avro.AvroTypeUtil;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.parquet.record;
package org.apache.nifi.parquet.hadoop;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet.record;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.parquet.stream.NifiParquetInputFile;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.io.InputFile;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
public class ParquetRecordReader implements RecordReader {
private GenericRecord lastParquetRecord;
private RecordSchema recordSchema;
private final InputStream inputStream;
private final InputFile inputFile;
private final ParquetReader<GenericRecord> parquetReader;
public ParquetRecordReader(final InputStream inputStream, final long inputLength, final Configuration configuration) throws IOException {
if (inputLength < 0) {
throw new IllegalArgumentException("Invalid input length of '" + inputLength + "'. This record reader requires knowing " +
"the length of the InputStream and cannot be used in some cases where the length may not be known.");
}
this.inputStream = inputStream;
inputFile = new NifiParquetInputFile(inputStream, inputLength);
parquetReader = AvroParquetReader.<GenericRecord>builder(inputFile).withConf(configuration).build();
// Read the first record so that we can extract the schema
lastParquetRecord = parquetReader.read();
if (lastParquetRecord == null) {
throw new EOFException("Unable to obtain schema because no records were available");
}
// Convert Avro schema to RecordSchema
recordSchema = AvroTypeUtil.createSchema(lastParquetRecord.getSchema());
}
@Override
public Record nextRecord(boolean coerceTypes, boolean dropUnknownFields) throws IOException {
// If null then no more records are available
if (lastParquetRecord == null) {
return null;
}
// Convert the last Parquet GenericRecord to NiFi Record
final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(lastParquetRecord, recordSchema);
final Record record = new MapRecord(recordSchema, values);
// Read the next record and store for next time
lastParquetRecord = parquetReader.read();
// Return the converted record
return record;
}
@Override
public RecordSchema getSchema() {
return recordSchema;
}
@Override
public void close() throws IOException {
try {
parquetReader.close();
} finally {
// ensure the input stream still gets closed
inputStream.close();
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet.record;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.parquet.stream.NifiParquetOutputFile;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.io.OutputFile;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.Map;
import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
public class WriteParquetResult extends AbstractRecordSetWriter {
private final Schema schema;
private final ParquetWriter<GenericRecord> parquetWriter;
private final ComponentLog componentLogger;
public WriteParquetResult(final Schema schema, final OutputStream out, final ParquetConfig parquetConfig, final ComponentLog componentLogger) throws IOException {
super(out);
this.schema = schema;
this.componentLogger = componentLogger;
final Configuration conf = new Configuration();
final OutputFile outputFile = new NifiParquetOutputFile(out);
final AvroParquetWriter.Builder<GenericRecord> writerBuilder =
AvroParquetWriter.<GenericRecord>builder(outputFile).withSchema(schema);
applyCommonConfig(writerBuilder, conf, parquetConfig);
parquetWriter = writerBuilder.build();
}
@Override
protected Map<String, String> writeRecord(final Record record) throws IOException {
final GenericRecord genericRecord = AvroTypeUtil.createAvroRecord(record, schema);
parquetWriter.write(genericRecord);
return Collections.emptyMap();
}
@Override
public void close() throws IOException {
try {
parquetWriter.close();
} finally {
// ensure the output stream still gets closed
super.close();
}
}
@Override
public String getMimeType() {
return "application/parquet";
}
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet.stream;
package org.apache.nifi.parquet.stream;
import org.apache.parquet.io.PositionOutputStream;

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet.stream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
import java.io.IOException;
import java.io.InputStream;
public class NifiParquetInputFile implements InputFile {
private final long length;
private final ByteCountingInputStream input;
public NifiParquetInputFile(final InputStream input, final long length) {
if (input == null) {
throw new IllegalArgumentException("InputStream is required");
}
if (!input.markSupported()) {
throw new IllegalArgumentException("InputStream must support mark/reset to be used with NifiParquetInputFile");
}
this.input = new ByteCountingInputStream(input);
this.length = length;
}
@Override
public long getLength() throws IOException {
return length;
}
@Override
public SeekableInputStream newStream() throws IOException {
return new NifiSeekableInputStream(input);
}
}

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet.stream;
package org.apache.nifi.parquet.stream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet.stream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import java.io.IOException;
public class NifiSeekableInputStream extends DelegatingSeekableInputStream {
private final ByteCountingInputStream input;
public NifiSeekableInputStream(final ByteCountingInputStream input) {
super(input);
this.input = input;
this.input.mark(Integer.MAX_VALUE);
}
@Override
public long getPos() throws IOException {
return input.getBytesConsumed();
}
@Override
public void seek(long newPos) throws IOException {
final long currentPos = getPos();
if (newPos == currentPos) {
return;
}
if (newPos < currentPos) {
// seeking backwards so first reset back to beginning of the stream then seek
input.reset();
input.mark(Integer.MAX_VALUE);
}
// must call getPos() again in case reset was called above
StreamUtils.skip(input, newPos - getPos());
}
@Override
public boolean markSupported() {
return false;
}
@Override
public synchronized void mark(int readlimit) {
throw new UnsupportedOperationException("Mark/reset is not supported");
}
@Override
public synchronized void reset() throws IOException {
throw new UnsupportedOperationException("Mark/reset is not supported");
}
}

View File

@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet.utils;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class ParquetConfig {
private Integer rowGroupSize;
private Integer pageSize;
private Integer dictionaryPageSize;
private Integer maxPaddingSize;
private Boolean enableDictionaryEncoding;
private Boolean enableValidation;
private Boolean avroReadCompatibility;
private Boolean avroAddListElementRecords;
private Boolean avroWriteOldListStructure;
private ParquetProperties.WriterVersion writerVersion;
private ParquetFileWriter.Mode writerMode;
private CompressionCodecName compressionCodec;
public Integer getRowGroupSize() {
return rowGroupSize;
}
public void setRowGroupSize(Integer rowGroupSize) {
this.rowGroupSize = rowGroupSize;
}
public Integer getPageSize() {
return pageSize;
}
public void setPageSize(Integer pageSize) {
this.pageSize = pageSize;
}
public Integer getDictionaryPageSize() {
return dictionaryPageSize;
}
public void setDictionaryPageSize(Integer dictionaryPageSize) {
this.dictionaryPageSize = dictionaryPageSize;
}
public Integer getMaxPaddingSize() {
return maxPaddingSize;
}
public void setMaxPaddingSize(Integer maxPaddingSize) {
this.maxPaddingSize = maxPaddingSize;
}
public Boolean getEnableDictionaryEncoding() {
return enableDictionaryEncoding;
}
public void setEnableDictionaryEncoding(Boolean enableDictionaryEncoding) {
this.enableDictionaryEncoding = enableDictionaryEncoding;
}
public Boolean getEnableValidation() {
return enableValidation;
}
public void setEnableValidation(Boolean enableValidation) {
this.enableValidation = enableValidation;
}
public Boolean getAvroReadCompatibility() {
return avroReadCompatibility;
}
public void setAvroReadCompatibility(Boolean avroReadCompatibility) {
this.avroReadCompatibility = avroReadCompatibility;
}
public Boolean getAvroAddListElementRecords() {
return avroAddListElementRecords;
}
public void setAvroAddListElementRecords(Boolean avroAddListElementRecords) {
this.avroAddListElementRecords = avroAddListElementRecords;
}
public Boolean getAvroWriteOldListStructure() {
return avroWriteOldListStructure;
}
public void setAvroWriteOldListStructure(Boolean avroWriteOldListStructure) {
this.avroWriteOldListStructure = avroWriteOldListStructure;
}
public ParquetProperties.WriterVersion getWriterVersion() {
return writerVersion;
}
public void setWriterVersion(ParquetProperties.WriterVersion writerVersion) {
this.writerVersion = writerVersion;
}
public ParquetFileWriter.Mode getWriterMode() {
return writerMode;
}
public void setWriterMode(ParquetFileWriter.Mode writerMode) {
this.writerMode = writerMode;
}
public CompressionCodecName getCompressionCodec() {
return compressionCodec;
}
public void setCompressionCodec(CompressionCodecName compressionCodec) {
this.compressionCodec = compressionCodec;
}
}

View File

@ -16,25 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.parquet.utils;
package org.apache.nifi.parquet.utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.parquet.PutParquet;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.avro.AvroWriteSupport;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class ParquetUtils {
@ -96,18 +99,36 @@ public class ParquetUtils {
.allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values())
.build();
public static List<AllowableValue> COMPRESSION_TYPES = getCompressionTypes();
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name("compression-type")
.displayName("Compression Type")
.description("The type of compression for the file being written.")
.allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0]))
.defaultValue(COMPRESSION_TYPES.get(0).getValue())
public static final PropertyDescriptor AVRO_READ_COMPATIBILITY = new PropertyDescriptor.Builder()
.name("avro-read-compatibility")
.displayName("Avro Read Compatibility")
.description("Specifies the value for '" + AvroReadSupport.AVRO_COMPATIBILITY + "' in the underlying Parquet library")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static List<AllowableValue> getCompressionTypes() {
public static final PropertyDescriptor AVRO_ADD_LIST_ELEMENT_RECORDS = new PropertyDescriptor.Builder()
.name("avro-add-list-element-records")
.displayName("Avro Add List Element Records")
.description("Specifies the value for '" + AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS + "' in the underlying Parquet library")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final PropertyDescriptor AVRO_WRITE_OLD_LIST_STRUCTURE = new PropertyDescriptor.Builder()
.name("avro-write-old-list-structure")
.displayName("Avro Write Old List Structure")
.description("Specifies the value for '" + AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE + "' in the underlying Parquet library")
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
.build();
public static final List<AllowableValue> COMPRESSION_TYPES = getCompressionTypes();
private static List<AllowableValue> getCompressionTypes() {
final List<AllowableValue> compressionTypes = new ArrayList<>();
for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
final String name = compressionCodecName.name();
@ -116,35 +137,48 @@ public class ParquetUtils {
return Collections.unmodifiableList(compressionTypes);
}
public static void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder,
final ProcessContext context,
final FlowFile flowFile,
final Configuration conf,
final AbstractProcessor abstractProcessor) {
builder.withConf(conf);
// NOTE: This needs to be named the same as the compression property in AbstractPutHDFSRecord
public static final String COMPRESSION_TYPE_PROP_NAME = "compression-type";
public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
.name(COMPRESSION_TYPE_PROP_NAME)
.displayName("Compression Type")
.description("The type of compression for the file being written.")
.allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0]))
.defaultValue(COMPRESSION_TYPES.get(0).getValue())
.required(true)
.build();
/**
* Creates a ParquetConfig instance from the given PropertyContext.
*
* @param context the PropertyContext from a component
* @param variables an optional set of variables to evaluate EL against, may be null
* @return the ParquetConfig
*/
public static ParquetConfig createParquetConfig(final PropertyContext context, final Map<String, String> variables) {
final ParquetConfig parquetConfig = new ParquetConfig();
// Required properties
boolean overwrite = true;
if(context.getProperty(PutParquet.OVERWRITE).isSet())
if(context.getProperty(PutParquet.OVERWRITE).isSet()) {
overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean();
}
final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
builder.withWriteMode(mode);
final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName());
final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
parquetConfig.setWriterMode(mode);
final String compressionTypeValue = context.getProperty(ParquetUtils.COMPRESSION_TYPE).getValue();
final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
builder.withCompressionCodec(codecName);
parquetConfig.setCompressionCodec(codecName);
// Optional properties
if (context.getProperty(ROW_GROUP_SIZE).isSet()){
try {
final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B);
if (rowGroupSize != null) {
builder.withRowGroupSize(rowGroupSize.intValue());
parquetConfig.setRowGroupSize(rowGroupSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
@ -153,9 +187,9 @@ public class ParquetUtils {
if (context.getProperty(PAGE_SIZE).isSet()) {
try {
final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B);
if (pageSize != null) {
builder.withPageSize(pageSize.intValue());
parquetConfig.setPageSize(pageSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
@ -164,9 +198,9 @@ public class ParquetUtils {
if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
try {
final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B);
if (dictionaryPageSize != null) {
builder.withDictionaryPageSize(dictionaryPageSize.intValue());
parquetConfig.setDictionaryPageSize(dictionaryPageSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
@ -175,9 +209,9 @@ public class ParquetUtils {
if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
try {
final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(variables).asDataSize(DataUnit.B);
if (maxPaddingSize != null) {
builder.withMaxPaddingSize(maxPaddingSize.intValue());
parquetConfig.setMaxPaddingSize(maxPaddingSize.intValue());
}
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
@ -186,17 +220,93 @@ public class ParquetUtils {
if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
builder.withDictionaryEncoding(enableDictionaryEncoding);
parquetConfig.setEnableDictionaryEncoding(enableDictionaryEncoding);
}
if (context.getProperty(ENABLE_VALIDATION).isSet()) {
final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
builder.withValidation(enableValidation);
parquetConfig.setEnableValidation(enableValidation);
}
if (context.getProperty(WRITER_VERSION).isSet()) {
final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
parquetConfig.setWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
}
if (context.getProperty(AVRO_READ_COMPATIBILITY).isSet()) {
final boolean avroReadCompatibility = context.getProperty(AVRO_READ_COMPATIBILITY).asBoolean();
parquetConfig.setAvroReadCompatibility(avroReadCompatibility);
}
if (context.getProperty(AVRO_ADD_LIST_ELEMENT_RECORDS).isSet()) {
final boolean avroAddListElementRecords = context.getProperty(AVRO_ADD_LIST_ELEMENT_RECORDS).asBoolean();
parquetConfig.setAvroAddListElementRecords(avroAddListElementRecords);
}
if (context.getProperty(AVRO_WRITE_OLD_LIST_STRUCTURE).isSet()) {
final boolean avroWriteOldListStructure = context.getProperty(AVRO_WRITE_OLD_LIST_STRUCTURE).asBoolean();
parquetConfig.setAvroWriteOldListStructure(avroWriteOldListStructure);
}
return parquetConfig;
}
public static void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final Configuration conf,
final ParquetConfig parquetConfig) {
builder.withConf(conf);
builder.withCompressionCodec(parquetConfig.getCompressionCodec());
// Optional properties
if (parquetConfig.getRowGroupSize() != null){
builder.withRowGroupSize(parquetConfig.getRowGroupSize());
}
if (parquetConfig.getPageSize() != null) {
builder.withPageSize(parquetConfig.getPageSize());
}
if (parquetConfig.getDictionaryPageSize() != null) {
builder.withDictionaryPageSize(parquetConfig.getDictionaryPageSize());
}
if (parquetConfig.getMaxPaddingSize() != null) {
builder.withMaxPaddingSize(parquetConfig.getMaxPaddingSize());
}
if (parquetConfig.getEnableDictionaryEncoding() != null) {
builder.withDictionaryEncoding(parquetConfig.getEnableDictionaryEncoding());
}
if (parquetConfig.getEnableValidation() != null) {
builder.withValidation(parquetConfig.getEnableValidation());
}
if (parquetConfig.getWriterVersion() != null) {
builder.withWriterVersion(parquetConfig.getWriterVersion());
}
if (parquetConfig.getWriterMode() != null) {
builder.withWriteMode(parquetConfig.getWriterMode());
}
applyCommonConfig(conf, parquetConfig);
}
public static void applyCommonConfig(Configuration conf, ParquetConfig parquetConfig) {
if (parquetConfig.getAvroReadCompatibility() != null) {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,
parquetConfig.getAvroReadCompatibility().booleanValue());
}
if (parquetConfig.getAvroAddListElementRecords() != null) {
conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
parquetConfig.getAvroAddListElementRecords().booleanValue());
}
if (parquetConfig.getAvroWriteOldListStructure() != null) {
conf.setBoolean(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
parquetConfig.getAvroWriteOldListStructure().booleanValue());
}
}
}

View File

@ -33,29 +33,33 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.parquet.stream.NifiParquetOutputFile;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetWriter;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.util.Map;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
@Tags({"avro", "parquet", "convert"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does "
@ -191,12 +195,13 @@ public class ConvertAvroToParquet extends AbstractProcessor {
.<GenericRecord>builder(nifiParquetOutputFile)
.withSchema(schema);
Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
conf.setBoolean("parquet.avro.add-list-element-records", false);
conf.setBoolean("parquet.avro.write-old-list-structure", false);
final ParquetConfig parquetConfig = createParquetConfig(context, flowFile.getAttributes());
parquetConfig.setAvroReadCompatibility(true);
parquetConfig.setAvroAddListElementRecords(false);
parquetConfig.setAvroWriteOldListStructure(false);
ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
final Configuration conf = new Configuration();
applyCommonConfig(parquetWriter, conf, parquetConfig);
return parquetWriter.build();
}

View File

@ -33,7 +33,7 @@ import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.AbstractFetchHDFSRecord;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordReader;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;

View File

@ -33,13 +33,14 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.parquet.utils.ParquetConfig;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter;
import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.parquet.hadoop.AvroParquetHDFSRecordWriter;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetWriter;
@ -49,6 +50,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.apache.nifi.parquet.utils.ParquetUtils.createParquetConfig;
import static org.apache.nifi.parquet.utils.ParquetUtils.applyCommonConfig;
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"put", "parquet", "hadoop", "HDFS", "filesystem", "record"})
@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
@ -79,19 +83,9 @@ public class PutParquet extends AbstractPutHDFSRecord {
.defaultValue("false")
.build();
public static final List<AllowableValue> COMPRESSION_TYPES;
static {
final List<AllowableValue> compressionTypes = new ArrayList<>();
for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
final String name = compressionCodecName.name();
compressionTypes.add(new AllowableValue(name, name));
}
COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
}
@Override
public List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context) {
return COMPRESSION_TYPES;
return ParquetUtils.COMPRESSION_TYPES;
}
@Override
@ -109,6 +103,8 @@ public class PutParquet extends AbstractPutHDFSRecord {
props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
props.add(ParquetUtils.ENABLE_VALIDATION);
props.add(ParquetUtils.WRITER_VERSION);
props.add(ParquetUtils.AVRO_WRITE_OLD_LIST_STRUCTURE);
props.add(ParquetUtils.AVRO_ADD_LIST_ELEMENT_RECORDS);
props.add(REMOVE_CRC_FILES);
return Collections.unmodifiableList(props);
}
@ -123,7 +119,8 @@ public class PutParquet extends AbstractPutHDFSRecord {
.<GenericRecord>builder(path)
.withSchema(avroSchema);
ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
final ParquetConfig parquetConfig = createParquetConfig(context, flowFile.getAttributes());
applyCommonConfig(parquetWriter, conf, parquetConfig);
return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.parquet.ParquetRecordSetWriter
org.apache.nifi.parquet.ParquetReader

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class TestParquetReader {
private Map<PropertyDescriptor,String> readerFactoryProperties;
private ConfigurationContext readerFactoryConfigContext;
private ParquetReader parquetReaderFactory;
private ComponentLog componentLog;
@Before
public void setup() {
readerFactoryProperties = new HashMap<>();
readerFactoryConfigContext = new MockConfigurationContext(readerFactoryProperties, null);
parquetReaderFactory = new ParquetReader();
parquetReaderFactory.abstractStoreConfigContext(readerFactoryConfigContext);
componentLog = new MockComponentLog("1234", parquetReaderFactory);
}
@Test
public void testReadUsers() throws IOException, MalformedRecordException {
final Schema schema = getSchema("src/test/resources/avro/user.avsc");
final File parquetFile = new File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
// write some users to the parquet file...
final int numUsers = 10;
try (final ParquetWriter<GenericRecord> writer = createParquetWriter(schema, parquetFile)) {
for (int i=0; i < numUsers; i++) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "Bob" + i);
user.put("favorite_number", i);
user.put("favorite_color", "blue" + i);
writer.write(user);
}
}
// read the parquet file into bytes since we can't use a FileInputStream since it doesn't support mark/reset
final byte[] parquetBytes = IOUtils.toByteArray(parquetFile.toURI());
// read the users in using the record reader...
try (final InputStream in = new ByteArrayInputStream(parquetBytes);
final RecordReader recordReader = parquetReaderFactory.createRecordReader(
Collections.emptyMap(), in, parquetFile.length(), componentLog)) {
int recordCount = 0;
while (recordReader.nextRecord() != null) {
recordCount++;
}
assertEquals(numUsers, recordCount);
}
}
private Schema getSchema(final String schemaFilePath) throws IOException {
final File schemaFile = new File(schemaFilePath);
final String schemaString = IOUtils.toString(new FileInputStream(schemaFile), StandardCharsets.UTF_8);
return new Schema.Parser().parse(schemaString);
}
private ParquetWriter<GenericRecord> createParquetWriter(final Schema schema, final File parquetFile) throws IOException {
final Configuration conf = new Configuration();
final Path parquetPath = new Path(parquetFile.getPath());
final ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(parquetPath)
.withSchema(schema)
.withConf(conf)
.build();
return writer;
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
public class TestParquetRecordSetWriter {
private ComponentLog componentLog;
private ParquetRecordSetWriter recordSetWriterFactory;
@Before
public void setup() {
recordSetWriterFactory = new ParquetRecordSetWriter();
componentLog = new MockComponentLog("1234", recordSetWriterFactory);
}
@Test
public void testWriteUsers() throws IOException, SchemaNotFoundException {
initRecordSetWriter("src/test/resources/avro/user.avsc");
// get the schema from the writer factory
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
// write some records
final int numUsers = 10;
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
// write some records...
writeUsers(writeSchema, parquetFile, numUsers);
// read the records back in to verify
verifyParquetRecords(parquetFile, numUsers);
}
@Test
public void testWriteUsersWhenSchemaFormatNotAvro() throws IOException, SchemaNotFoundException {
initRecordSetWriter("src/test/resources/avro/user.avsc");
// get the schema from the writer factory
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
final RecordSchema writeSchemaWithOtherFormat = new SimpleRecordSchema(writeSchema.getFields(), null, "OTHER-FORMAT", SchemaIdentifier.EMPTY);
// write some records
final int numUsers = 10;
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
// write some records...
writeUsers(writeSchemaWithOtherFormat, parquetFile, numUsers);
// read the records back in to verify
verifyParquetRecords(parquetFile, numUsers);
}
private void initRecordSetWriter(final String schemaFile) throws IOException {
final ConfigurationContext configurationContext = getConfigurationContextWithSchema(schemaFile);
// simulate enabling the service
recordSetWriterFactory.onEnabled(configurationContext);
recordSetWriterFactory.storeSchemaWriteStrategy(configurationContext);
recordSetWriterFactory.storeSchemaAccessStrategy(configurationContext);
}
private void writeUsers(final RecordSchema writeSchema, final File parquetFile, final int numUsers) throws IOException {
try(final OutputStream output = new FileOutputStream(parquetFile);
final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(componentLog, writeSchema, output, Collections.emptyMap())) {
for (int i = 0; i < numUsers; i++) {
final Map<String, Object> userFields = new HashMap<>();
userFields.put("name", "user" + i);
userFields.put("favorite_number", i);
userFields.put("favorite_color", "blue");
final Record userRecord = new MapRecord(writeSchema, userFields);
recordSetWriter.write(userRecord);
}
recordSetWriter.flush();
}
}
private void verifyParquetRecords(final File parquetFile, final int expectedRecordCount) throws IOException {
final Configuration conf = new Configuration();
final Path path = new Path(parquetFile.getPath());
final InputFile inputFile = HadoopInputFile.fromPath(path, conf);
try (final ParquetReader<GenericRecord> reader =
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(conf).build()){
int recordCount = 0;
while(reader.read() != null) {
recordCount++;
}
assertEquals(expectedRecordCount, recordCount);
}
}
private ConfigurationContext getConfigurationContextWithSchema(String schemaPath) throws IOException {
final File schemaFile = new File(schemaPath);
final Map<PropertyDescriptor, String> properties = createPropertiesWithSchema(schemaFile);
return new MockConfigurationContext(properties, null);
}
private Map<PropertyDescriptor,String> createPropertiesWithSchema(final File schemaFile) throws IOException {
return createPropertiesWithSchema(IOUtils.toString(schemaFile.toURI()));
}
private Map<PropertyDescriptor,String> createPropertiesWithSchema(final String schemaText) {
final Map<PropertyDescriptor,String> propertyValues = new HashMap<>();
propertyValues.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
propertyValues.put(SchemaAccessUtils.SCHEMA_TEXT, schemaText);
return propertyValues;
}
}

View File

@ -42,7 +42,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.parquet.utils.ParquetUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;

View File

@ -60,10 +60,10 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createRecordReader(variables, in, logger);
return recordFactory.get().createRecordReader(variables, in, inputLength, logger);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}

View File

@ -97,9 +97,10 @@ class ScriptedReaderTest {
recordReaderFactory.initialize initContext
recordReaderFactory.onEnabled configurationContext
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
byte[] contentBytes = 'Flow file content not used'.bytes
InputStream inStream = new ByteArrayInputStream(contentBytes)
RecordReader recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, logger)
RecordReader recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, contentBytes.length, logger)
assertNotNull(recordReader)
3.times {
@ -157,7 +158,7 @@ class ScriptedReaderTest {
Map<String, String> schemaVariables = ['record.tag': 'myRecord']
InputStream inStream = new ByteArrayInputStream('''
byte[] contentBytes = '''
<root>
<myRecord>
<id>1</id>
@ -175,9 +176,11 @@ class ScriptedReaderTest {
<code>300</code>
</myRecord>
</root>
'''.bytes)
'''.bytes
RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, logger)
InputStream inStream = new ByteArrayInputStream(contentBytes)
RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, contentBytes.length, logger)
assertNotNull(recordReader)
3.times {

View File

@ -57,7 +57,7 @@ class GroovyRecordReader implements RecordReader {
class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyRecordReader()
}
}

View File

@ -67,7 +67,7 @@ class GroovyXmlRecordReaderFactory extends AbstractControllerService implements
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue()
if (!schemaText) return null

View File

@ -123,7 +123,7 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
// Get the first record and process it before we create the Record Writer. We do this so that if the Processor
// updates the Record's schema, we can provide an updated schema to the Record Writer. If there are no records,

View File

@ -121,7 +121,7 @@ public abstract class AbstractRouteRecord<T> extends AbstractProcessor {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
final Record firstRecord = reader.nextRecord();
if (firstRecord == null) {

View File

@ -17,20 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -73,6 +59,20 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@SideEffectFree
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -238,7 +238,7 @@ public class ForkRecord extends AbstractProcessor {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());
final OutputStream out = session.write(outFlowFile);

View File

@ -334,10 +334,10 @@ public class ListenTCPRecord extends AbstractProcessor {
synchronized (socketRecordReader) {
FlowFile flowFile = session.create();
try {
// lazily creating the record reader here b/c we need a flow file, eventually shouldn't have to do this
// lazily creating the record reader here
RecordReader recordReader = socketRecordReader.getRecordReader();
if (recordReader == null) {
recordReader = socketRecordReader.createRecordReader(flowFile, getLogger());
recordReader = socketRecordReader.createRecordReader(getLogger());
}
Record record;

View File

@ -242,7 +242,8 @@ public class ListenUDPRecord extends AbstractListenEventProcessor<StandardEvent>
final RecordReader reader;
final List<Record> records = new ArrayList<>();
try (final InputStream in = new ByteArrayInputStream(event.getData())) {
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, getLogger());
final long inputLength = event.getData() == null ? -1 : event.getData().length;
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, inputLength, getLogger());
Record record;
while((record = reader.nextRecord()) != null) {

View File

@ -17,22 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -68,6 +52,22 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -200,7 +200,7 @@ public class PartitionRecord extends AbstractProcessor {
try (final InputStream in = session.read(flowFile)) {
final Map<String, String> originalAttributes = flowFile.getAttributes();
final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger());
final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, flowFile.getSize(), getLogger());
final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema());

View File

@ -286,7 +286,7 @@ public class QueryRecord extends AbstractProcessor {
final RecordSchema readerSchema;
try (final InputStream rawIn = session.read(original)) {
final Map<String, String> originalAttributes = original.getAttributes();
final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger());
final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, original.getSize(), getLogger());
readerSchema = reader.getSchema();
writerSchema = recordSetWriterFactory.getSchema(originalAttributes, readerSchema);

View File

@ -17,17 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@ -61,6 +50,17 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@EventDriven
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@ -153,7 +153,7 @@ public class SplitRecord extends AbstractProcessor {
session.read(original, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) {
try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, original.getSize(), getLogger())) {
final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema());

View File

@ -17,15 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.json.JsonRecordSetWriter;
@ -49,6 +40,15 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class TestForkRecord {
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
@ -461,7 +461,8 @@ public class TestForkRecord {
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger)
throws MalformedRecordException, IOException, SchemaNotFoundException {
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat);
}

View File

@ -17,14 +17,6 @@
package org.apache.nifi.processors.standard;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.csv.CSVReader;
@ -48,6 +40,14 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -257,9 +257,10 @@ public class TestValidateRecord {
runner.setProperty(avroReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.enableControllerService(avroReader);
final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
final byte[] validFlowFileBytes = validFlowFile.toByteArray();
try (
final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFile.toByteArray());
final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, runner.getLogger());
final ByteArrayInputStream resultContentStream = new ByteArrayInputStream(validFlowFileBytes);
final RecordReader recordReader = avroReader.createRecordReader(validFlowFile.getAttributes(), resultContentStream, validFlowFileBytes.length, runner.getLogger());
) {
final RecordSchema resultSchema = recordReader.getSchema();
assertEquals(3, resultSchema.getFieldCount());

View File

@ -302,7 +302,7 @@ public class RestLookupService extends AbstractControllerService implements Reco
final Record record;
try (final InputStream is = responseBody.byteStream();
final InputStream bufferedIn = new BufferedInputStream(is)) {
record = handleResponse(bufferedIn, context);
record = handleResponse(bufferedIn, responseBody.contentLength(), context);
}
return Optional.ofNullable(record);
@ -342,9 +342,9 @@ public class RestLookupService extends AbstractControllerService implements Reco
return client.newCall(request).execute();
}
private Record handleResponse(InputStream is, Map<String, String> context) throws SchemaNotFoundException, MalformedRecordException, IOException {
private Record handleResponse(InputStream is, long inputLength, Map<String, String> context) throws SchemaNotFoundException, MalformedRecordException, IOException {
try (RecordReader reader = readerFactory.createRecordReader(context, is, getLogger())) {
try (RecordReader reader = readerFactory.createRecordReader(context, is, inputLength, getLogger())) {
Record record = reader.nextRecord();

View File

@ -37,7 +37,7 @@ public interface RecordReaderFactory extends ControllerService {
/**
* Create a RecordReader instance to read records from specified InputStream.
* This method calls {@link #createRecordReader(Map, InputStream, ComponentLog)} with Attributes of the specified FlowFile.
* This method calls {@link #createRecordReader(Map, InputStream, long, ComponentLog)} with Attributes of the specified FlowFile.
* @param flowFile Attributes of this FlowFile are used to resolve Record Schema via Expression Language dynamically. This can be null.
*
* @param in InputStream containing Records.
@ -46,7 +46,7 @@ public interface RecordReaderFactory extends ControllerService {
* @return Created RecordReader instance
*/
default RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return createRecordReader(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(), in, logger);
return createRecordReader(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(), in, flowFile == null ? -1 : flowFile.getSize(), logger);
}
/**
@ -68,10 +68,11 @@ public interface RecordReaderFactory extends ControllerService {
* @param variables A map containing variables which is used to resolve the Record Schema dynamically via Expression Language.
* This can be null or empty.
* @param in InputStream containing Records.
* @param inputLength the length of the content to read from the InputStream in bytes, a negative value indicates an unknown or unbound size
* @param logger A logger bound to a component
*
* @return Created RecordReader instance
*/
RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException;
}

View File

@ -94,7 +94,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue();
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
return new AvroReaderWithEmbeddedSchema(in);

View File

@ -139,7 +139,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
// Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header.
in.mark(1024 * 1024);
final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(in), null);

View File

@ -267,7 +267,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
}

View File

@ -162,7 +162,8 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, MalformedRecordException, SchemaNotFoundException {
final InputStream bufferedIn = new BufferedInputStream(in);
final RecordSchema schema = getSchema(variables, bufferedIn, null);
return new JsonPathRowRecordReader(jsonPaths, schema, bufferedIn, logger, dateFormat, timeFormat, timestampFormat);

View File

@ -105,7 +105,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat);
}

View File

@ -143,7 +143,7 @@ public class Syslog5424Reader extends SchemaRegistryService implements RecordRea
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new Syslog5424RecordReader(parser, in, schema);
}

View File

@ -136,7 +136,7 @@ public class SyslogReader extends SchemaRegistryService implements RecordReaderF
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new SyslogRecordReader(parser, in, schema);
}

View File

@ -160,7 +160,8 @@ public class XMLReader extends SchemaRegistryService implements RecordReaderFact
}
@Override
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException, MalformedRecordException {
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final ConfigurationContext context = getConfigurationContext();
final RecordSchema schema = getSchema(variables, in, null);