From 1f67cbf62886019d1148468fd8683ed78b4d0a12 Mon Sep 17 00:00:00 2001 From: Koji Kawamura Date: Thu, 1 Jun 2017 21:50:17 +0900 Subject: [PATCH] NIFI-4004: Use RecordReaderFactory without FlowFile. - Removed FlowFile from RecordReaderFactory, RecordSetWriterFactory and SchemaAccessStrategy. - Renamed variable 'allowableValue' to 'strategy' to represent its meaning better. - Removed creation of temporal FlowFile to resolve Record Schema from ConsumerLease. - Removed unnecessary 'InputStream content' argument from RecordSetWriterFactory.getSchema method. This closes #1877. --- .../schema/access/AvroSchemaTextStrategy.java | 9 +++--- .../hadoop/AbstractFetchHDFSRecord.java | 6 ++-- .../record/CommaSeparatedRecordReader.java | 3 +- .../record/MockRecordParser.java | 8 +---- .../record/MockRecordWriter.java | 6 ++-- .../ConfluentSchemaRegistryStrategy.java | 4 +-- ...worksAttributeSchemaReferenceStrategy.java | 24 ++++++++------ ...onworksEncodedSchemaReferenceStrategy.java | 4 +-- .../access/InheritSchemaFromRecord.java | 4 +-- .../schema/access/SchemaAccessStrategy.java | 24 +++++++------- .../access/SchemaNamePropertyStrategy.java | 8 ++--- .../kafka/pubsub/ConsumerLease.java | 14 +++----- .../kafka/pubsub/PublishKafkaRecord_0_10.java | 5 +-- .../kafka/pubsub/PublisherLease.java | 2 +- .../kafka/pubsub/util/MockRecordParser.java | 3 +- .../kafka/pubsub/util/MockRecordWriter.java | 6 ++-- .../processors/parquet/FetchParquetTest.java | 2 +- .../nifi/record/script/ScriptedReader.java | 6 ++-- .../script/ScriptedRecordSetWriter.java | 10 +++--- .../record/script/ScriptedReaderTest.groovy | 9 ++---- .../script/ScriptedRecordSetWriterTest.groovy | 6 ++-- .../groovy/test_record_reader_inline.groovy | 3 +- .../groovy/test_record_reader_xml.groovy | 5 ++- .../groovy/test_record_writer_inline.groovy | 11 +++---- .../standard/AbstractRecordProcessor.java | 7 ++-- .../standard/AbstractRouteRecord.java | 7 ++-- .../processors/standard/ListenTCPRecord.java | 4 +-- .../processors/standard/PartitionRecord.java | 7 ++-- .../nifi/processors/standard/QueryRecord.java | 7 ++-- .../nifi/processors/standard/SplitRecord.java | 7 ++-- .../processors/standard/ValidateRecord.java | 2 +- .../processors/standard/merge/RecordBin.java | 2 +- .../processors/standard/TestQueryRecord.java | 6 ++-- .../serialization/RecordReaderFactory.java | 30 ++++++++++++++--- .../serialization/RecordSetWriterFactory.java | 32 ++++++++++--------- .../java/org/apache/nifi/avro/AvroReader.java | 11 +++---- .../apache/nifi/avro/AvroRecordSetWriter.java | 3 +- .../EmbeddedAvroSchemaAccessStrategy.java | 4 +-- .../nifi/csv/CSVHeaderSchemaStrategy.java | 4 +-- .../java/org/apache/nifi/csv/CSVReader.java | 12 +++---- .../apache/nifi/csv/CSVRecordSetWriter.java | 3 +- .../java/org/apache/nifi/grok/GrokReader.java | 20 ++++++------ .../org/apache/nifi/json/JsonPathReader.java | 6 ++-- .../apache/nifi/json/JsonRecordSetWriter.java | 3 +- .../org/apache/nifi/json/JsonTreeReader.java | 6 ++-- .../SchemaRegistryRecordSetWriter.java | 16 +++++----- .../serialization/SchemaRegistryService.java | 23 ++++++------- .../text/FreeFormTextRecordSetWriter.java | 13 ++++---- 48 files changed, 211 insertions(+), 206 deletions(-) diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java index 5bf084ef29..53a297f309 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java @@ -20,13 +20,13 @@ package org.apache.nifi.schema.access; import org.apache.avro.Schema; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.serialization.record.RecordSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.InputStream; import java.util.EnumSet; +import java.util.Map; import java.util.Set; public class AvroSchemaTextStrategy implements SchemaAccessStrategy { @@ -40,13 +40,14 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { - final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue(); + public RecordSchema getSchema(Map variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException { + final String schemaText; + schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(variables).getValue(); if (schemaText == null || schemaText.trim().isEmpty()) { throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text"); } - logger.debug("For {} found schema text {}", flowFile, schemaText); + logger.debug("For {} found schema text {}", variables, schemaText); try { final Schema avroSchema = new Schema.Parser().parse(schemaText); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java index fbbbbf4eba..d6a374f50f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java @@ -190,16 +190,16 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor { // use a child FlowFile so that if any error occurs we can route the original untouched FlowFile to retry/failure child = session.create(originalFlowFile); - final FlowFile writableFlowFile = child; final AtomicReference mimeTypeRef = new AtomicReference<>(); child = session.write(child, (final OutputStream rawOut) -> { try (final BufferedOutputStream out = new BufferedOutputStream(rawOut); final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) { Record record = recordReader.nextRecord(); - final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, record == null ? null : record.getSchema()); + final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile.getAttributes(), + record == null ? null : record.getSchema()); - try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) { + try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, out)) { recordSetWriter.beginRecordSet(); if (record != null) { recordSetWriter.write(record); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java index 1c22687893..6597b75e4d 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/CommaSeparatedRecordReader.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; @@ -52,7 +51,7 @@ public class CommaSeparatedRecordReader extends AbstractControllerService implem } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); final List fields = new ArrayList<>(); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java index b6606a849a..0fcdcbf332 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordParser.java @@ -26,18 +26,12 @@ import java.util.List; import java.util.Map; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.MapRecord; -import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; -import org.apache.nifi.serialization.record.RecordSchema; public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory { private final List records = new ArrayList<>(); @@ -65,7 +59,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { final Iterator itr = records.iterator(); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java index 9bde6473b1..d7579e82a5 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java @@ -20,9 +20,9 @@ package org.apache.nifi.serialization.record; import java.io.IOException; import java.io.OutputStream; import java.util.Collections; +import java.util.Map; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; @@ -54,12 +54,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema schema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(Map variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return new SimpleRecordSchema(Collections.emptyList()); } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { private int recordCount = 0; private boolean headerWritten = false; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java index f892ab882e..b71e811f61 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java @@ -22,9 +22,9 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.stream.io.StreamUtils; @@ -43,7 +43,7 @@ public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final byte[] buffer = new byte[5]; try { StreamUtils.fillBuffer(contentStream, buffer); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java index 073a45359c..19606c0601 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java @@ -17,7 +17,6 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; @@ -25,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy { @@ -46,34 +46,38 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); } + public boolean isFlowFileRequired() { + return true; + } + @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { - final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE); - final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE); - final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); + public RecordSchema getSchema(Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE); + final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE); + final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because it is missing one of the following three required attributes: " + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: " + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); } if (!isNumber(schemaProtocol)) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + schemaProtocol + "', which is not a valid Protocol Version number"); } final int protocol = Integer.parseInt(schemaProtocol); if (protocol != 1) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1."); } if (!isNumber(schemaIdentifier)) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" + schemaProtocol + "', which is not a valid Schema Identifier number"); } if (!isNumber(schemaVersion)) { - throw new SchemaNotFoundException("Could not determine Schema for " + flowFile + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" + schemaProtocol + "', which is not a valid Schema Version number"); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java index b2e5a48234..74bde54926 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java @@ -17,7 +17,6 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.stream.io.StreamUtils; @@ -27,6 +26,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy { @@ -45,7 +45,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final byte[] buffer = new byte[13]; try { StreamUtils.fillBuffer(contentStream, buffer); diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java index d1ed63db8d..b25ea0b5bb 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java @@ -20,15 +20,15 @@ package org.apache.nifi.schema.access; import java.io.IOException; import java.io.InputStream; import java.util.EnumSet; +import java.util.Map; import java.util.Set; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.serialization.record.RecordSchema; public class InheritSchemaFromRecord implements SchemaAccessStrategy { @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(final Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { if (readSchema == null) { throw new SchemaNotFoundException("Cannot inherit Schema from Record because no schema was found"); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java index 923eaf0725..93b9de9755 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java @@ -17,26 +17,28 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; import java.io.InputStream; +import java.util.Map; import java.util.Set; public interface SchemaAccessStrategy { - /** - * Returns the schema for the given FlowFile using the supplied stream of content and configuration - * - * @param flowFile flowfile - * @param contentStream content of flowfile - * @param readSchema the schema that was read from the input FlowFile, or null if there was none - * @return the RecordSchema for the FlowFile - */ - RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException; /** - * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}. + * Returns the schema for the given FlowFile using the supplied stream of content and configuration. + * + * @param variables Variables which is used to resolve Record Schema via Expression Language. + * This can be null or empty. + * @param contentStream The stream which is used to read the serialized content. + * @param readSchema The schema that was read from the input content, or null if there was none. + * @return The RecordSchema for the content. + */ + RecordSchema getSchema(Map variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException; + + /** + * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(Map, InputStream, RecordSchema)}. */ Set getSuppliedSchemaFields(); } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java index 796e1e4565..07ba3f3be3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java @@ -18,13 +18,13 @@ package org.apache.nifi.schema.access; import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; import java.io.InputStream; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { @@ -43,10 +43,10 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { - final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue(); + public RecordSchema getSchema(final Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { + final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(variables).getValue(); if (schemaName.trim().isEmpty()) { - throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name."); + throw new SchemaNotFoundException(String.format("%s did not provide appropriate Schema Name", schemaNamePropertyValue)); } try { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 9b24bc7692..1f58ab3965 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -29,6 +29,7 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -456,18 +457,13 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } private void writeRecordData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { - // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile. - // We don't want to create a new FlowFile for each record that we receive, so we will just create - // a "temporary flowfile" that will be removed in the finally block below and use that to pass to - // the createRecordReader method. - final FlowFile tempFlowFile = session.create(); RecordSetWriter writer = null; try { for (final ConsumerRecord consumerRecord : records) { final Record record; try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { - final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger); + final RecordReader reader = readerFactory.createRecordReader(Collections.EMPTY_MAP, in, logger); record = reader.nextRecord(); } catch (final Exception e) { handleParseFailure(consumerRecord, session, e); @@ -490,7 +486,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordSchema writeSchema; try { - writeSchema = writerFactory.getSchema(flowFile, recordSchema); + writeSchema = writerFactory.getSchema(Collections.emptyMap(), recordSchema); } catch (final Exception e) { logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); @@ -504,7 +500,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe throw new ProcessException(e); } - writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut); + writer = writerFactory.createWriter(logger, writeSchema, rawOut); writer.beginRecordSet(); tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); @@ -544,8 +540,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } throw new ProcessException(e); - } finally { - session.remove(tempFlowFile); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index 21b2e314cb..0e6ca6f3de 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -325,16 +325,17 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue(); + final Map attributes = flowFile.getAttributes(); try { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { try (final InputStream in = new BufferedInputStream(rawIn)) { - final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + final RecordReader reader = readerFactory.createRecordReader(attributes, in, getLogger()); final RecordSet recordSet = reader.createRecordSet(); - final RecordSchema schema = writerFactory.getSchema(flowFile, recordSet.getSchema()); + final RecordSchema schema = writerFactory.getSchema(attributes, recordSet.getSchema()); lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic); } catch (final SchemaNotFoundException | MalformedRecordException e) { throw new ProcessException(e); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java index 2004346423..8603bbcbd3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java @@ -106,7 +106,7 @@ public class PublisherLease implements Closeable { Record record; int recordCount = 0; - try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) { + try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) { while ((record = recordSet.next()) != null) { recordCount++; baos.reset(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java index 9a9fed7c25..21527be89f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.MalformedRecordException; @@ -64,7 +63,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor } @Override - public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException { final BufferedReader reader = new BufferedReader(new InputStreamReader(in)); return new RecordReader() { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 60e494b50e..90a909d9b6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -20,9 +20,9 @@ package org.apache.nifi.processors.kafka.pubsub.util; import java.io.IOException; import java.io.OutputStream; import java.util.Collections; +import java.util.Map; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; @@ -52,12 +52,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSchema getSchema(FlowFile flowFile, RecordSchema schema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(Map variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { return null; } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { @Override diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java index ffff2a3771..76d44aab5c 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -220,7 +220,7 @@ public class FetchParquetTest { final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); - when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(FlowFile.class), any(OutputStream.class))).thenReturn(recordSetWriter); + when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class), any(OutputStream.class))).thenReturn(recordSetWriter); testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory); testRunner.enableControllerService(recordSetWriterFactory); diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java index d95c87f1ee..7b544fb9a6 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java @@ -22,7 +22,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -37,6 +36,7 @@ import java.io.InputStream; import java.lang.reflect.UndeclaredThrowableException; import java.util.Collection; import java.util.HashSet; +import java.util.Map; /** * A RecordReader implementation that allows the user to script the RecordReader instance @@ -52,10 +52,10 @@ public class ScriptedReader extends AbstractScriptedRecordFactory variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { if (recordFactory.get() != null) { try { - return recordFactory.get().createRecordReader(flowFile, in, logger); + return recordFactory.get().createRecordReader(variables, in, logger); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java index c0160e650a..7544ae2f50 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -21,6 +21,7 @@ import java.io.OutputStream; import java.lang.reflect.UndeclaredThrowableException; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import javax.script.Invocable; import javax.script.ScriptException; @@ -31,7 +32,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -55,10 +55,10 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { final RecordSetWriterFactory writerFactory = recordFactory.get(); if (writerFactory == null) { return null; } try { - return writerFactory.getSchema(flowFile, readSchema); + return writerFactory.getSchema(variables, readSchema); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy index 440ecb25bb..6142ddd7cf 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy @@ -27,7 +27,6 @@ import org.apache.nifi.script.ScriptingComponentHelper import org.apache.nifi.script.ScriptingComponentUtils import org.apache.nifi.serialization.RecordReader import org.apache.nifi.util.MockComponentLog -import org.apache.nifi.util.MockFlowFile import org.apache.nifi.util.MockPropertyValue import org.apache.nifi.util.TestRunners import org.junit.Before @@ -98,10 +97,9 @@ class ScriptedReaderTest { recordReaderFactory.initialize initContext recordReaderFactory.onEnabled configurationContext - MockFlowFile mockFlowFile = new MockFlowFile(1L) InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) - RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger) + RecordReader recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, logger) assertNotNull(recordReader) 3.times { @@ -157,8 +155,7 @@ class ScriptedReaderTest { recordReaderFactory.initialize initContext recordReaderFactory.onEnabled configurationContext - MockFlowFile mockFlowFile = new MockFlowFile(1L) - mockFlowFile.putAttributes(['record.tag': 'myRecord']) + Map schemaVariables = ['record.tag': 'myRecord'] InputStream inStream = new ByteArrayInputStream(''' @@ -180,7 +177,7 @@ class ScriptedReaderTest { '''.bytes) - RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger) + RecordReader recordReader = recordReaderFactory.createRecordReader(schemaVariables, inStream, logger) assertNotNull(recordReader) 3.times { diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy index 22e984ac91..c3a7990e24 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy @@ -30,7 +30,6 @@ import org.apache.nifi.serialization.record.MapRecord import org.apache.nifi.serialization.record.RecordField import org.apache.nifi.serialization.record.RecordFieldType import org.apache.nifi.serialization.record.RecordSet -import org.apache.nifi.util.MockFlowFile import org.apache.nifi.util.MockPropertyValue import org.apache.nifi.util.TestRunners import org.junit.Before @@ -100,11 +99,10 @@ class ScriptedRecordSetWriterTest { recordSetWriterFactory.initialize initContext recordSetWriterFactory.onEnabled configurationContext - MockFlowFile mockFlowFile = new MockFlowFile(1L) - def schema = recordSetWriterFactory.getSchema(mockFlowFile, null) + def schema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null) ByteArrayOutputStream outputStream = new ByteArrayOutputStream() - RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, mockFlowFile, outputStream) + RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema, outputStream) assertNotNull(recordSetWriter) def recordSchema = new SimpleRecordSchema( diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy index 5e6a9ca385..83e772062f 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy @@ -16,7 +16,6 @@ */ import org.apache.nifi.controller.AbstractControllerService -import org.apache.nifi.flowfile.FlowFile import org.apache.nifi.logging.ComponentLog import org.apache.nifi.schema.access.SchemaNotFoundException import org.apache.nifi.serialization.MalformedRecordException @@ -58,7 +57,7 @@ class GroovyRecordReader implements RecordReader { class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { - RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + RecordReader createRecordReader(Map variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { return new GroovyRecordReader() } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy index b94c380f40..c7296099ad 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy @@ -18,7 +18,6 @@ import groovy.json.JsonSlurper import org.apache.nifi.controller.AbstractControllerService import org.apache.nifi.controller.ConfigurationContext -import org.apache.nifi.flowfile.FlowFile import org.apache.nifi.logging.ComponentLog import org.apache.nifi.schema.access.SchemaNotFoundException import org.apache.nifi.serialization.MalformedRecordException @@ -68,7 +67,7 @@ class GroovyXmlRecordReaderFactory extends AbstractControllerService implements // Will be set by the ScriptedRecordReaderFactory ConfigurationContext configurationContext - RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + RecordReader createRecordReader(Map variables, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { // Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue() if (!schemaText) return null @@ -77,7 +76,7 @@ class GroovyXmlRecordReaderFactory extends AbstractControllerService implements def entry = field.entrySet()[0] new RecordField(entry.key, RecordFieldType.of(entry.value).dataType) } as List) - return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream) + return new GroovyXmlRecordReader(variables.get('record.tag'), recordSchema, inputStream) } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy index fa4a552907..ccdb9ae7f7 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -17,9 +17,6 @@ import groovy.xml.MarkupBuilder -import java.io.IOException -import java.io.InputStream - import org.apache.nifi.controller.AbstractControllerService import org.apache.nifi.flowfile.FlowFile import org.apache.nifi.logging.ComponentLog @@ -102,12 +99,12 @@ class GroovyRecordSetWriter implements RecordSetWriter { class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override - public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException { - return null; + RecordSchema getSchema(Map variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return null } - + @Override - RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException { + RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter(out) } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java index ab1219e17e..422629d528 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -104,15 +104,16 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor { final AtomicInteger recordCount = new AtomicInteger(); final FlowFile original = flowFile; + final Map originalAttributes = flowFile.getAttributes(); try { flowFile = session.write(flowFile, new StreamCallback() { @Override public void process(final InputStream in, final OutputStream out) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { - final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema()); - try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) { + final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); + try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out)) { writer.beginRecordSet(); Record record; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java index 6664c7ba65..374ed483af 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java @@ -116,13 +116,14 @@ public abstract class AbstractRouteRecord extends AbstractProcessor { final AtomicInteger numRecords = new AtomicInteger(0); final Map> writers = new HashMap<>(); final FlowFile original = flowFile; + final Map originalAttributes = original.getAttributes(); try { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { - final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema()); + final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); Record record; while ((record = reader.nextRecord()) != null) { @@ -135,7 +136,7 @@ public abstract class AbstractRouteRecord extends AbstractProcessor { if (tuple == null) { FlowFile outFlowFile = session.create(original); final OutputStream out = session.write(outFlowFile); - recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, original, out); + recordSetWriter = writerFactory.createWriter(getLogger(), writeSchema, out); recordSetWriter.beginRecordSet(); tuple = new Tuple<>(outFlowFile, recordSetWriter); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java index 4fa5f17a08..7c3e978f9d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCPRecord.java @@ -376,9 +376,9 @@ public class ListenTCPRecord extends AbstractProcessor { String mimeType = null; WriteResult writeResult = null; - final RecordSchema recordSchema = recordSetWriterFactory.getSchema(flowFile, record.getSchema()); + final RecordSchema recordSchema = recordSetWriterFactory.getSchema(Collections.EMPTY_MAP, record.getSchema()); try (final OutputStream out = session.write(flowFile); - final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, flowFile, out)) { + final RecordSetWriter recordWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) { // start the record set and write the first record from above recordWriter.beginRecordSet(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java index 56267fecff..4a86e8199b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java @@ -191,9 +191,10 @@ public class PartitionRecord extends AbstractProcessor { final Map writerMap = new HashMap<>(); try (final InputStream in = session.read(flowFile)) { - final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger()); + final Map originalAttributes = flowFile.getAttributes(); + final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger()); - final RecordSchema writeSchema = writerFactory.getSchema(flowFile, reader.getSchema()); + final RecordSchema writeSchema = writerFactory.getSchema(originalAttributes, reader.getSchema()); Record record; while ((record = reader.nextRecord()) != null) { @@ -221,7 +222,7 @@ public class PartitionRecord extends AbstractProcessor { final OutputStream out = session.write(childFlowFile); - writer = writerFactory.createWriter(getLogger(), writeSchema, childFlowFile, out); + writer = writerFactory.createWriter(getLogger(), writeSchema, out); writer.beginRecordSet(); writerMap.put(recordValueMap, writer); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index f1d28d155a..bcca7f484a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -251,10 +251,11 @@ public class QueryRecord extends AbstractProcessor { // Determine the schema for writing the data final RecordSchema recordSchema; try (final InputStream rawIn = session.read(original)) { - final RecordReader reader = recordReaderFactory.createRecordReader(original, rawIn, getLogger()); + final Map originalAttributes = original.getAttributes(); + final RecordReader reader = recordReaderFactory.createRecordReader(originalAttributes, rawIn, getLogger()); final RecordSchema inputSchema = reader.getSchema(); - recordSchema = recordSetWriterFactory.getSchema(original, inputSchema); + recordSchema = recordSetWriterFactory.getSchema(originalAttributes, inputSchema); } catch (final Exception e) { getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e}); session.transfer(original, REL_FAILURE); @@ -294,7 +295,7 @@ public class QueryRecord extends AbstractProcessor { transformed = session.write(transformed, new OutputStreamCallback() { @Override public void process(final OutputStream out) throws IOException { - try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) { + try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, out)) { final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs); writeResultRef.set(resultSetWriter.write(resultSet)); mimeTypeRef.set(resultSetWriter.getMimeType()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java index 00546d2dfb..947d997e5d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -135,13 +135,14 @@ public class SplitRecord extends AbstractProcessor { final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger(); final List splits = new ArrayList<>(); + final Map originalAttributes = original.getAttributes(); try { session.read(original, new InputStreamCallback() { @Override public void process(final InputStream in) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + try (final RecordReader reader = readerFactory.createRecordReader(originalAttributes, in, getLogger())) { - final RecordSchema schema = writerFactory.getSchema(original, reader.getSchema()); + final RecordSchema schema = writerFactory.getSchema(originalAttributes, reader.getSchema()); final RecordSet recordSet = reader.createRecordSet(); final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet); @@ -154,7 +155,7 @@ public class SplitRecord extends AbstractProcessor { final WriteResult writeResult; try (final OutputStream out = session.write(split); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, split, out)) { + final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out)) { if (maxRecords == 1) { final Record record = pushbackSet.next(); writeResult = writer.write(record); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index 8e8fca8999..646db889cf 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -419,7 +419,7 @@ public class ValidateRecord extends AbstractProcessor { } final OutputStream out = session.write(flowFile); - final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, flowFile, out); + final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, out); created.beginRecordSet(); return created; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java index 0b4cf316fa..ad60f6ae64 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -135,7 +135,7 @@ public class RecordBin { this.out = new ByteCountingOutputStream(rawOut); - recordWriter = writerFactory.createWriter(logger, record.getSchema(), flowFile, out); + recordWriter = writerFactory.createWriter(logger, record.getSchema(), out); recordWriter.beginRecordSet(); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 0443a1132a..4e93253bf8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -22,10 +22,10 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -260,7 +260,7 @@ public class TestQueryRecord { } @Override - public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(Map variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException { final List recordFields = columnNames.stream() .map(name -> new RecordField(name, RecordFieldType.STRING.getDataType())) .collect(Collectors.toList()); @@ -268,7 +268,7 @@ public class TestQueryRecord { } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new RecordSetWriter() { @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java index 7d7268e2e4..6ff380da60 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java @@ -17,14 +17,16 @@ package org.apache.nifi.serialization; -import java.io.IOException; -import java.io.InputStream; - import org.apache.nifi.controller.ControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; +import java.util.Map; + /** *

* A Controller Service that is responsible for creating a {@link RecordReader}. @@ -32,6 +34,26 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; */ public interface RecordReaderFactory extends ControllerService { - RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException; + /** + * Create a RecordReader instance to read records from specified InputStream. + * This method calls {@link #createRecordReader(Map, InputStream, ComponentLog)} with Attributes of the specified FlowFile. + * @param flowFile Attributes of this FlowFile are used to resolve Record Schema via Expression Language dynamically. This can be null. + * @param in InputStream containing Records. This can be null or empty stream. + * @param logger A logger bind to a component + * @return Created RecordReader instance + */ + default RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + return createRecordReader(flowFile == null ? Collections.emptyMap() : flowFile.getAttributes(), in, logger); + } + + /** + * Create a RecordReader instance to read records from specified InputStream. + * @param variables A map contains variables which is used to resolve Record Schema via Expression Language dynamically. + * This can be null or empty. + * @param in InputStream containing Records. This can be null or empty stream. + * @param logger A logger bind to a component + * @return Created RecordReader instance + */ + RecordReader createRecordReader(Map variables, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java index 22b88050de..a9032e4ae6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java @@ -18,24 +18,28 @@ package org.apache.nifi.serialization; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; import org.apache.nifi.controller.ControllerService; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; /** *

- * A Controller Service that is responsible for creating a {@link RecordSetWriter}. The writer is created - * based on a FlowFile and an InputStream for that FlowFile, but it is important to note that this the FlowFile passed - * to the {@link #createWriter(ComponentLog, FlowFile, InputStream)} may not be the FlowFile that the Writer will writer to. - * Rather, it is the FlowFile and InputStream from which the Writer's Schema should be determined. This is done because most - * Processors that make use of Record Writers also make use of Record Readers and the schema for the output is often determined - * by either reading the schema from the content of the input FlowFile or from referencing attributes of the - * input FlowFile. + * A Controller Service that is responsible for creating a {@link RecordSetWriter}. + *

+ *

A writer is created with a RecordSchema and an OutputStream that the writer will write to.

+ *

+ * The schema can be retrieved by {@link #getSchema(Map, RecordSchema)} method, based on a Map containing variables, + * and a RecordSchema which is read from the incoming FlowFile. + *

+ *

+ * For most processors those make use of Record Writers also make use of Record Readers, and the schema for the output is often determined + * by either reading the schema from referencing attributes or the content of the input FlowFile. + * In this case, if a RecordSchema is known and already available when calling {@link #getSchema(Map, RecordSchema)} method, + * the schema should be specified so that it can be reused. *

* *

@@ -47,18 +51,17 @@ public interface RecordSetWriterFactory extends ControllerService { /** *

- * Returns the Schema that will be used for writing Records. Note that the FlowFile and InputStream that are given - * may well be different than the FlowFile that the writer will write to. The given FlowFile and InputStream are + * Returns the Schema that will be used for writing Records. The given variables are * intended to be used for determining the schema that should be used when writing records. *

* - * @param flowFile the FlowFile from which the schema should be determined. + * @param variables the variables which is used to resolve Record Schema via Expression Language, can be null or empty * @param readSchema the schema that was read from the incoming FlowFile, or null if there is no input schema * * @return the Schema that should be used for writing Records * @throws SchemaNotFoundException if unable to find the schema */ - RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException; + RecordSchema getSchema(Map variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException; /** *

@@ -68,11 +71,10 @@ public interface RecordSetWriterFactory extends ControllerService { * @param logger the logger to use when logging information. This is passed in, rather than using the logger of the Controller Service * because it allows messages to be logged for the component that is calling this Controller Service. * @param schema the schema that will be used for writing records - * @param flowFile the FlowFile to write to * @param out the OutputStream to write to * * @return a RecordSetWriter that can write record sets to an OutputStream * @throws IOException if unable to read from the given InputStream */ - RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, FlowFile flowFile, OutputStream out) throws SchemaNotFoundException, IOException; + RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) throws SchemaNotFoundException, IOException; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index 88b657cfa9..eed37f867f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -31,7 +31,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -66,11 +65,11 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } @Override - protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) { - if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) { + protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry schemaRegistry, ConfigurationContext context) { + if (EMBEDDED_AVRO_SCHEMA.getValue().equals(strategy)) { return new EmbeddedAvroSchemaAccessStrategy(); } else { - return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + return super.getSchemaAccessStrategy(strategy, schemaRegistry, context); } } @@ -84,12 +83,12 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue(); if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { return new AvroReaderWithEmbeddedSchema(in); } else { - final RecordSchema recordSchema = getSchema(flowFile, in, null); + final RecordSchema recordSchema = getSchema(variables, in, null); final Schema avroSchema; try { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index a8459a9cdc..7e498414d4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -36,7 +36,6 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.schema.access.SchemaField; @@ -80,7 +79,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement "The FlowFile will have the Avro schema embedded into the content, as is typical with Avro"); @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final FlowFile flowFile, final OutputStream out) throws IOException { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema recordSchema, final OutputStream out) throws IOException { final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue(); final String compressionFormat = getConfigurationContext().getProperty(COMPRESSION_FORMAT).getValue(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java index d32e3e5f15..b289cbea73 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java @@ -20,13 +20,13 @@ package org.apache.nifi.avro; import java.io.IOException; import java.io.InputStream; import java.util.EnumSet; +import java.util.Map; import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy { private final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final DataFileStream dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader()); final Schema avroSchema = dataFileStream.getSchema(); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index fa60b2a86e..642f360817 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -23,6 +23,7 @@ import java.io.Reader; import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.commons.csv.CSVFormat; @@ -30,7 +31,6 @@ import org.apache.commons.csv.CSVParser; import org.apache.commons.io.input.BOMInputStream; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; @@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { } @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { + public RecordSchema getSchema(Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { if (this.context == null) { throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index 135dd8018e..f15f85d60a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.commons.csv.CSVFormat; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -31,7 +32,6 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaAccessUtils; @@ -99,23 +99,23 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { // Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header. final BufferedInputStream bufferedIn = new BufferedInputStream(in); bufferedIn.mark(1024 * 1024); - final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null); + final RecordSchema schema = getSchema(variables, new NonCloseableInputStream(bufferedIn), null); bufferedIn.reset(); return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat); } @Override - protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { - if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) { + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { + if (strategy.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) { return new CSVHeaderSchemaStrategy(context); } - return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + return super.getSchemaAccessStrategy(strategy, schemaRegistry, context); } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index c5e6b192d2..bd2e60042f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -28,7 +28,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; @@ -69,7 +68,7 @@ public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements R } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { return new WriteCSVResult(csvFormat, schema, getSchemaAccessWriter(schema), out, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), includeHeader); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 54a23333cc..30c7dd3e3b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -35,7 +35,6 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schema.access.SchemaAccessStrategy; @@ -203,20 +202,20 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } @Override - protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { - if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { + if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { return createAccessStrategy(); } else { - return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + return super.getSchemaAccessStrategy(strategy, schemaRegistry, context); } } @Override - protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { - if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ValidationContext context) { + if (strategy.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { return createAccessStrategy(); } else { - return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + return super.getSchemaAccessStrategy(strategy, schemaRegistry, context); } } @@ -224,8 +223,9 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac return new SchemaAccessStrategy() { private final Set schemaFields = EnumSet.noneOf(SchemaField.class); + @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException { + public RecordSchema getSchema(Map variables, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException { return recordSchema; } @@ -237,8 +237,8 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { - final RecordSchema schema = getSchema(flowFile, in, null); + public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { + final RecordSchema schema = getSchema(variables, in, null); return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java index 45cbbd1d43..cab4449dff 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -34,7 +35,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeUtils; @@ -128,8 +128,8 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { - final RecordSchema schema = getSchema(flowFile, in, null); + public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { + final RecordSchema schema = getSchema(variables, in, null); return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java index 9a722c1ceb..cbe2f59ead 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -27,7 +27,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; @@ -64,7 +63,7 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException { return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null)); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index 063c9df094..829028436a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; @@ -28,7 +29,6 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.DateTimeUtils; @@ -69,7 +69,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade } @Override - public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { - return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, null), dateFormat, timeFormat, timestampFormat); + public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { + return new JsonTreeRowRecordReader(in, logger, getSchema(variables, in, null), dateFormat, timeFormat, timestampFormat); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index fb28b17644..4853e4b1fb 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -147,22 +147,22 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic return schemaAccessStrategyList; } - protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) { - if (allowableValue == null) { + protected SchemaAccessWriter getSchemaWriteStrategy(final String strategy) { + if (strategy == null) { return null; } - if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) { + if (strategy.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) { return new SchemaNameAsAttribute(); - } else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) { + } else if (strategy.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) { return new WriteAvroSchemaAttributeStrategy(); - } else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { + } else if (strategy.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) { return new HortonworksEncodedSchemaReferenceWriter(); - } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { + } else if (strategy.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { return new HortonworksAttributeSchemaReferenceWriter(); - } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { + } else if (strategy.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { return new ConfluentSchemaRegistryWriter(); - } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) { + } else if (strategy.equalsIgnoreCase(NO_SCHEMA.getValue())) { return new NopSchemaAccessWriter(); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index 53b030a92a..502d548513 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -24,7 +24,6 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaField; @@ -41,6 +40,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA; @@ -57,7 +57,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService { private volatile ConfigurationContext configurationContext; private volatile SchemaAccessStrategy schemaAccessStrategy; - private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); + private static InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); private final List strategyList = Collections.unmodifiableList(Arrays.asList( SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)); @@ -108,22 +108,17 @@ public abstract class SchemaRegistryService extends AbstractControllerService { return schemaAccessStrategy; } - public final RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + public final RecordSchema getSchema(final Map variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(); if (accessStrategy == null) { throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service"); } - return getSchemaAccessStrategy().getSchema(flowFile, contentStream, readSchema); + return getSchemaAccessStrategy().getSchema(variables, contentStream, readSchema); } - public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { - final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(); - if (accessStrategy == null) { - throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service"); - } - - return getSchemaAccessStrategy().getSchema(flowFile, EMPTY_INPUT_STREAM, readSchema); + public RecordSchema getSchema(final Map variables, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { + return getSchema(variables, EMPTY_INPUT_STREAM, readSchema); } @Override @@ -148,12 +143,12 @@ public abstract class SchemaRegistryService extends AbstractControllerService { return suppliedFields; } - protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { - if (allowableValue == null) { + protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { + if (strategy == null) { return null; } - return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + return SchemaAccessUtils.getSchemaAccessStrategy(strategy, schemaRegistry, context); } protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java index 7057e4378d..1ed2abad09 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java @@ -17,7 +17,6 @@ package org.apache.nifi.text; -import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; import java.util.ArrayList; @@ -29,10 +28,11 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schema.access.InheritSchemaFromRecord; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; @@ -77,12 +77,13 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final FlowFile flowFile, final OutputStream out) { + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) { return new FreeFormTextWriter(textValue, characterSet, out); } @Override - public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { - return readSchema; + protected SchemaAccessStrategy getSchemaAccessStrategy(String strategy, SchemaRegistry schemaRegistry, ConfigurationContext context) { + return new InheritSchemaFromRecord(); } + }