From e7dcb6f6c5665a2a2b8b96b39b76417f92e4f190 Mon Sep 17 00:00:00 2001
From: Mark Payne
Date: Wed, 7 Jun 2017 13:42:13 -0400
Subject: [PATCH] NIFI-3921: Allow Record Writers to inherit schema from Record
Signed-off-by: Matt Burgess
This closes #1902
---
.../org/apache/nifi/avro/AvroTypeUtil.java | 177 +++++++++++++++---
.../schema/access/AvroSchemaTextStrategy.java | 2 +-
.../nifi/schema/access/SchemaAccessUtils.java | 10 +
.../WriteAvroSchemaAttributeStrategy.java} | 26 +--
.../apache/nifi/avro/TestAvroTypeUtil.java | 103 ++++++++++
.../hadoop/AbstractFetchHDFSRecord.java | 66 +++----
.../hadoop/AbstractPutHDFSRecord.java | 95 ++--------
.../record/MockRecordWriter.java | 3 +-
...worksAttributeSchemaReferenceStrategy.java | 2 +-
...onworksEncodedSchemaReferenceStrategy.java | 2 +-
.../access/InheritSchemaFromRecord.java | 44 +++++
.../schema/access/NopSchemaAccessWriter.java | 49 +++++
.../schema/access/SchemaAccessStrategy.java | 3 +-
.../access/SchemaNamePropertyStrategy.java | 2 +-
.../kafka/pubsub/ConsumerLease.java | 2 +-
.../kafka/pubsub/PublishKafkaRecord_0_10.java | 21 +--
.../kafka/pubsub/PublisherLease.java | 4 +-
.../pubsub/TestPublishKafkaRecord_0_10.java | 18 +-
.../kafka/pubsub/util/MockRecordWriter.java | 3 +-
.../nifi/processors/parquet/PutParquet.java | 2 +-
.../processors/parquet/FetchParquetTest.java | 3 +-
.../processors/parquet/PutParquetTest.java | 68 ++-----
.../script/ScriptedRecordSetWriter.java | 22 +--
.../script/ScriptedRecordSetWriterTest.groovy | 4 +-
.../groovy/test_record_writer_inline.groovy | 2 +-
.../standard/AbstractRecordProcessor.java | 41 ++--
.../standard/AbstractRouteRecord.java | 13 +-
.../processors/standard/PartitionRecord.java | 12 +-
.../nifi/processors/standard/QueryRecord.java | 25 +--
.../nifi/processors/standard/SplitRecord.java | 12 +-
.../processors/standard/TestQueryRecord.java | 19 +-
.../serialization/RecordSetWriterFactory.java | 5 +-
.../java/org/apache/nifi/avro/AvroReader.java | 2 +-
.../apache/nifi/avro/AvroRecordSetWriter.java | 2 +-
.../EmbeddedAvroSchemaAccessStrategy.java | 2 +-
.../nifi/csv/CSVHeaderSchemaStrategy.java | 2 +-
.../java/org/apache/nifi/csv/CSVReader.java | 2 +-
.../java/org/apache/nifi/grok/GrokReader.java | 4 +-
.../org/apache/nifi/json/JsonPathReader.java | 2 +-
.../org/apache/nifi/json/JsonTreeReader.java | 2 +-
.../SchemaRegistryRecordSetWriter.java | 31 ++-
.../serialization/SchemaRegistryService.java | 19 +-
.../text/FreeFormTextRecordSetWriter.java | 9 +-
.../TestWriteAvroResultWithoutSchema.java | 4 +-
.../nifi/csv/TestCSVHeaderSchemaStrategy.java | 2 +-
45 files changed, 573 insertions(+), 370 deletions(-)
rename nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/{nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java => nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java} (57%)
create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java
create mode 100644 nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index 76438c5212..87139c6ced 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -17,33 +17,6 @@
package org.apache.nifi.avro;
-import org.apache.avro.Conversions;
-import org.apache.avro.LogicalType;
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericData.Array;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-import org.apache.avro.JsonProperties;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.DataType;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.apache.nifi.serialization.record.util.DataTypeUtils;
-import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;
@@ -61,6 +34,36 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class AvroTypeUtil {
private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
public static final String AVRO_SCHEMA_FORMAT = "avro";
@@ -72,30 +75,142 @@ public class AvroTypeUtil {
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOGICAL_TYPE_DECIMAL = "decimal";
- public static Schema extractAvroSchema(final RecordSchema recordSchema) throws SchemaNotFoundException {
+ public static Schema extractAvroSchema(final RecordSchema recordSchema) {
if (recordSchema == null) {
throw new IllegalArgumentException("RecordSchema cannot be null");
}
final Optional schemaFormatOption = recordSchema.getSchemaFormat();
if (!schemaFormatOption.isPresent()) {
- throw new SchemaNotFoundException("No Schema Format was present in the RecordSchema");
+ return buildAvroSchema(recordSchema);
}
final String schemaFormat = schemaFormatOption.get();
if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
- throw new SchemaNotFoundException("Schema provided is not in Avro format");
+ return buildAvroSchema(recordSchema);
}
final Optional textOption = recordSchema.getSchemaText();
if (!textOption.isPresent()) {
- throw new SchemaNotFoundException("No Schema text was present in the RecordSchema");
+ return buildAvroSchema(recordSchema);
}
final String text = textOption.get();
return new Schema.Parser().parse(text);
}
+ private static Schema buildAvroSchema(final RecordSchema recordSchema) {
+ final List avroFields = new ArrayList<>(recordSchema.getFieldCount());
+ for (final RecordField recordField : recordSchema.getFields()) {
+ avroFields.add(buildAvroField(recordField));
+ }
+
+ final Schema avroSchema = Schema.createRecord("nifiRecord", null, "org.apache.nifi", false, avroFields);
+ return avroSchema;
+ }
+
+ private static Field buildAvroField(final RecordField recordField) {
+ final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName());
+ final Field field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
+ for (final String alias : recordField.getAliases()) {
+ field.addAlias(alias);
+ }
+
+ return field;
+ }
+
+ private static Schema buildAvroSchema(final DataType dataType, final String fieldName) {
+ final Schema schema;
+
+ switch (dataType.getFieldType()) {
+ case ARRAY:
+ final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+ final DataType elementDataType = arrayDataType.getElementType();
+ if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) {
+ schema = Schema.create(Type.BYTES);
+ } else {
+ final Schema elementType = buildAvroSchema(elementDataType, fieldName);
+ schema = Schema.createArray(elementType);
+ }
+ break;
+ case BIGINT:
+ schema = Schema.create(Type.STRING);
+ break;
+ case BOOLEAN:
+ schema = Schema.create(Type.BOOLEAN);
+ break;
+ case BYTE:
+ schema = Schema.create(Type.INT);
+ break;
+ case CHAR:
+ schema = Schema.create(Type.STRING);
+ break;
+ case CHOICE:
+ final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
+ final List options = choiceDataType.getPossibleSubTypes();
+
+ final List unionTypes = new ArrayList<>(options.size());
+ for (final DataType option : options) {
+ unionTypes.add(buildAvroSchema(option, fieldName));
+ }
+
+ schema = Schema.createUnion(unionTypes);
+ break;
+ case DATE:
+ schema = Schema.create(Type.INT);
+ LogicalTypes.date().addToSchema(schema);
+ break;
+ case DOUBLE:
+ schema = Schema.create(Type.DOUBLE);
+ break;
+ case FLOAT:
+ schema = Schema.create(Type.FLOAT);
+ break;
+ case INT:
+ schema = Schema.create(Type.INT);
+ break;
+ case LONG:
+ schema = Schema.create(Type.LONG);
+ break;
+ case MAP:
+ schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName));
+ break;
+ case RECORD:
+ final RecordDataType recordDataType = (RecordDataType) dataType;
+ final RecordSchema childSchema = recordDataType.getChildSchema();
+
+ final List childFields = new ArrayList<>(childSchema.getFieldCount());
+ for (final RecordField field : childSchema.getFields()) {
+ childFields.add(buildAvroField(field));
+ }
+
+ schema = Schema.createRecord(fieldName + "Type", null, "org.apache.nifi", false, childFields);
+ break;
+ case SHORT:
+ schema = Schema.create(Type.INT);
+ break;
+ case STRING:
+ schema = Schema.create(Type.STRING);
+ break;
+ case TIME:
+ schema = Schema.create(Type.INT);
+ LogicalTypes.timeMillis().addToSchema(schema);
+ break;
+ case TIMESTAMP:
+ schema = Schema.create(Type.LONG);
+ LogicalTypes.timestampMillis().addToSchema(schema);
+ break;
+ default:
+ return null;
+ }
+
+ return nullable(schema);
+ }
+
+ private static Schema nullable(final Schema schema) {
+ return Schema.createUnion(Schema.create(Type.NULL), schema);
+ }
+
/**
* Returns a DataType for the given Avro Schema
*
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
index 3155909cda..5bf084ef29 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/AvroSchemaTextStrategy.java
@@ -40,7 +40,7 @@ public class AvroSchemaTextStrategy implements SchemaAccessStrategy {
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaText = schemaTextPropertyValue.evaluateAttributeExpressions(flowFile).getValue();
if (schemaText == null || schemaText.trim().isEmpty()) {
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Text");
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index cab9c02b83..b335b1156a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -43,6 +43,10 @@ public class SchemaAccessUtils {
+ "found at https://github.com/hortonworks/registry");
public static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
"The FlowFile contains 3 Attributes that will be used to lookup a Schema from the configured Schema Registry: 'schema.identifier', 'schema.version', and 'schema.protocol.version'");
+ public static final AllowableValue INHERIT_RECORD_SCHEMA = new AllowableValue("inherit-record-schema", "Inherit Record Schema",
+ "The schema used to write records will be the same schema that was given to the Record when the Record was created.");
+
+
public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
.name("schema-registry")
@@ -117,6 +121,8 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ProcessContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+ } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
+ return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
@@ -131,6 +137,8 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+ } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
+ return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
@@ -145,6 +153,8 @@ public class SchemaAccessUtils {
public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME));
+ } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
+ return new InheritSchemaFromRecord();
} else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {
return new AvroSchemaTextStrategy(context.getProperty(SCHEMA_TEXT));
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
similarity index 57%
rename from nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
rename to nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
index f39bdca2da..d9be6732a0 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaTextAsAttribute.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/WriteAvroSchemaAttributeStrategy.java
@@ -17,44 +17,36 @@
package org.apache.nifi.schema.access;
+import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.serialization.record.RecordSchema;
-public class SchemaTextAsAttribute implements SchemaAccessWriter {
- private static final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
+public class WriteAvroSchemaAttributeStrategy implements SchemaAccessWriter {
@Override
- public void writeHeader(final RecordSchema schema, final OutputStream out) {
+ public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException {
}
@Override
public Map getAttributes(final RecordSchema schema) {
- final Optional textFormatOption = schema.getSchemaFormat();
- final Optional textOption = schema.getSchemaText();
- return Collections.singletonMap(textFormatOption.get() + ".schema", textOption.get());
+ final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
+ final String schemaText = avroSchema.toString();
+ return Collections.singletonMap("avro.schema", schemaText);
}
@Override
public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException {
- final Optional textFormatOption = schema.getSchemaFormat();
- if (!textFormatOption.isPresent()) {
- throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text Format is not present");
- }
-
- final Optional textOption = schema.getSchemaText();
- if (!textOption.isPresent()) {
- throw new SchemaNotFoundException("Cannot write Schema Text as Attribute because the Schema's Text is not present");
- }
}
@Override
public Set getRequiredSchemaFields() {
- return schemaFields;
+ return EnumSet.noneOf(SchemaField.class);
}
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
new file mode 100644
index 0000000000..fe1973351f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/test/java/org/apache/nifi/avro/TestAvroTypeUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.junit.Test;
+
+public class TestAvroTypeUtil {
+
+ @Test
+ public void testCreateAvroSchemaPrimitiveTypes() throws SchemaNotFoundException {
+ final List fields = new ArrayList<>();
+ fields.add(new RecordField("int", RecordFieldType.INT.getDataType()));
+ fields.add(new RecordField("long", RecordFieldType.LONG.getDataType()));
+ fields.add(new RecordField("string", RecordFieldType.STRING.getDataType(), "hola", Collections.singleton("greeting")));
+ fields.add(new RecordField("byte", RecordFieldType.BYTE.getDataType()));
+ fields.add(new RecordField("char", RecordFieldType.CHAR.getDataType()));
+ fields.add(new RecordField("short", RecordFieldType.SHORT.getDataType()));
+ fields.add(new RecordField("double", RecordFieldType.DOUBLE.getDataType()));
+ fields.add(new RecordField("float", RecordFieldType.FLOAT.getDataType()));
+ fields.add(new RecordField("time", RecordFieldType.TIME.getDataType()));
+ fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
+ fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+
+ final DataType arrayType = RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType());
+ fields.add(new RecordField("strings", arrayType));
+
+ final DataType mapType = RecordFieldType.MAP.getMapDataType(RecordFieldType.LONG.getDataType());
+ fields.add(new RecordField("map", mapType));
+
+
+ final List personFields = new ArrayList<>();
+ personFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+ personFields.add(new RecordField("dob", RecordFieldType.DATE.getDataType()));
+ final RecordSchema personSchema = new SimpleRecordSchema(personFields);
+ final DataType personType = RecordFieldType.RECORD.getRecordDataType(personSchema);
+ fields.add(new RecordField("person", personType));
+
+
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
+
+ // everything should be a union, since it's nullable.
+ for (final Field field : avroSchema.getFields()) {
+ final Schema fieldSchema = field.schema();
+ assertEquals(Type.UNION, fieldSchema.getType());
+ assertTrue("Field " + field.name() + " does not contain NULL type", fieldSchema.getTypes().contains(Schema.create(Type.NULL)));
+ }
+
+ final RecordSchema afterConversion = AvroTypeUtil.createSchema(avroSchema);
+
+ assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("int").get());
+ assertEquals(RecordFieldType.LONG.getDataType(), afterConversion.getDataType("long").get());
+ assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("string").get());
+ assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("byte").get());
+ assertEquals(RecordFieldType.STRING.getDataType(), afterConversion.getDataType("char").get());
+ assertEquals(RecordFieldType.INT.getDataType(), afterConversion.getDataType("short").get());
+ assertEquals(RecordFieldType.DOUBLE.getDataType(), afterConversion.getDataType("double").get());
+ assertEquals(RecordFieldType.FLOAT.getDataType(), afterConversion.getDataType("float").get());
+ assertEquals(RecordFieldType.TIME.getDataType(), afterConversion.getDataType("time").get());
+ assertEquals(RecordFieldType.DATE.getDataType(), afterConversion.getDataType("date").get());
+ assertEquals(RecordFieldType.TIMESTAMP.getDataType(), afterConversion.getDataType("timestamp").get());
+ assertEquals(arrayType, afterConversion.getDataType("strings").get());
+ assertEquals(mapType, afterConversion.getDataType("map").get());
+ assertEquals(personType, afterConversion.getDataType("person").get());
+
+ final RecordField stringField = afterConversion.getField("string").get();
+ assertEquals("hola", stringField.getDefaultValue());
+ assertEquals(Collections.singleton("greeting"), stringField.getAliases());
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
index 7cc6bb5703..fbbbbf4eba 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractFetchHDFSRecord.java
@@ -16,7 +16,22 @@
*/
package org.apache.nifi.processors.hadoop;
-import org.apache.commons.io.input.NullInputStream;
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,29 +52,11 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
-import java.io.BufferedOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* Base processor for reading a data from HDFS that can be fetched into records.
*/
@@ -187,7 +184,6 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
final AtomicReference writeResult = new AtomicReference<>();
final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, new NullInputStream(0));
final StopWatch stopWatch = new StopWatch(true);
@@ -200,22 +196,20 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
try (final BufferedOutputStream out = new BufferedOutputStream(rawOut);
final HDFSRecordReader recordReader = createHDFSRecordReader(context, originalFlowFile, configuration, path)) {
- final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
-
- final RecordSet recordSet = new RecordSet() {
- @Override
- public RecordSchema getSchema() throws IOException {
- return emptySchema;
- }
-
- @Override
- public Record next() throws IOException {
- return recordReader.nextRecord();
- }
- };
+ Record record = recordReader.nextRecord();
+ final RecordSchema schema = recordSetWriterFactory.getSchema(originalFlowFile, record == null ? null : record.getSchema());
try (final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(getLogger(), schema, writableFlowFile, out)) {
- writeResult.set(recordSetWriter.write(recordSet));
+ recordSetWriter.beginRecordSet();
+ if (record != null) {
+ recordSetWriter.write(record);
+ }
+
+ while ((record = recordReader.nextRecord()) != null) {
+ recordSetWriter.write(record);
+ }
+
+ writeResult.set(recordSetWriter.finishRecordSet());
mimeTypeRef.set(recordSetWriter.getMimeType());
}
} catch (Exception e) {
@@ -247,7 +241,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
} catch (final FileNotFoundException | AccessControlException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, e});
- final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage());
+ final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, e.getMessage() == null ? e.toString() : e.getMessage());
session.transfer(failureFlowFile, REL_FAILURE);
} catch (final IOException | FlowFileAccessException e) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to retry", new Object[] {filenameValue, originalFlowFile, e});
@@ -255,7 +249,7 @@ public abstract class AbstractFetchHDFSRecord extends AbstractHadoopProcessor {
context.yield();
} catch (final Throwable t) {
getLogger().error("Failed to retrieve content from {} for {} due to {}; routing to failure", new Object[] {filenameValue, originalFlowFile, t});
- final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage());
+ final FlowFile failureFlowFile = session.putAttribute(originalFlowFile, FETCH_FAILURE_REASON_ATTR, t.getMessage() == null ? t.toString() : t.getMessage());
session.transfer(failureFlowFile, REL_FAILURE);
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
index 6676ee62d3..70a3697c27 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-hadoop-record-utils/src/main/java/org/apache/nifi/processors/hadoop/AbstractPutHDFSRecord.java
@@ -16,6 +16,21 @@
*/
package org.apache.nifi.processors.hadoop;
+import java.io.BufferedInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -28,8 +43,6 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
@@ -42,10 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.exception.RecordReaderFactoryException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
-import org.apache.nifi.schema.access.SchemaAccessStrategy;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
@@ -53,32 +63,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.StopWatch;
-import java.io.BufferedInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
-import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
-
/**
* Base class for processors that write Records to HDFS.
*/
@@ -156,18 +140,10 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
private volatile String remoteOwner;
private volatile String remoteGroup;
- private volatile SchemaAccessStrategy schemaAccessStrategy;
private volatile Set putHdfsRecordRelationships;
private volatile List putHdfsRecordProperties;
- private final List strategyList = Collections.unmodifiableList(Arrays.asList(
- SCHEMA_NAME_PROPERTY,
- SCHEMA_TEXT_PROPERTY,
- HWX_SCHEMA_REF_ATTRIBUTES,
- HWX_CONTENT_ENCODED_SCHEMA
- ));
-
@Override
protected final void init(final ProcessorInitializationContext context) {
@@ -187,19 +163,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
.description("The parent directory to which files should be written. Will be created if it doesn't exist.")
.build());
- final AllowableValue[] strategies = getSchemaAccessStrategyValues().toArray(new AllowableValue[0]);
-
- props.add(new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(SCHEMA_ACCESS_STRATEGY)
- .description("Specifies how to obtain the schema that is to be used for writing the data.")
- .allowableValues(strategies)
- .defaultValue(getDefaultSchemaAccessStrategy().getValue())
- .build());
-
- props.add(SCHEMA_REGISTRY);
- props.add(SCHEMA_NAME);
- props.add(SCHEMA_TEXT);
-
final AllowableValue[] compressionTypes = getCompressionTypes(context).toArray(new AllowableValue[0]);
props.add(new PropertyDescriptor.Builder()
@@ -216,18 +179,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
this.putHdfsRecordProperties = Collections.unmodifiableList(props);
}
- protected List getSchemaAccessStrategyValues() {
- return strategyList;
- }
-
- protected AllowableValue getDefaultSchemaAccessStrategy() {
- return SCHEMA_NAME_PROPERTY;
- }
-
- private PropertyDescriptor getSchemaAcessStrategyDescriptor() {
- return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
- }
-
/**
* @param context the initialization context
* @return the possible compression types
@@ -259,22 +210,11 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
return putHdfsRecordProperties;
}
- @Override
- protected Collection customValidate(ValidationContext validationContext) {
- final String schemaAccessStrategy = validationContext.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
- return SchemaAccessUtils.validateSchemaAccessStrategy(validationContext, schemaAccessStrategy, getSchemaAccessStrategyValues());
- }
@OnScheduled
public final void onScheduled(final ProcessContext context) throws IOException {
super.abstractOnScheduled(context);
- final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
-
- final PropertyDescriptor descriptor = getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
- final String schemaAccess = context.getProperty(descriptor).getValue();
- this.schemaAccessStrategy = SchemaAccessUtils.getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
-
this.remoteOwner = context.getProperty(REMOTE_OWNER).getValue();
this.remoteGroup = context.getProperty(REMOTE_GROUP).getValue();
@@ -365,8 +305,6 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
HDFSRecordWriter recordWriter = null;
try (final BufferedInputStream in = new BufferedInputStream(rawIn)) {
- final RecordSchema destRecordSchema = schemaAccessStrategy.getSchema(flowFile, in);
- recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, destRecordSchema);
// if we fail to create the RecordReader then we want to route to failure, so we need to
// handle this separately from the other IOExceptions which normally route to retry
@@ -379,8 +317,9 @@ public abstract class AbstractPutHDFSRecord extends AbstractHadoopProcessor {
}
final RecordSet recordSet = recordReader.createRecordSet();
- writeResult.set(recordWriter.write(recordSet));
+ recordWriter = createHDFSRecordWriter(context, flowFile, configuration, tempFile, recordReader.getSchema());
+ writeResult.set(recordWriter.write(recordSet));
} catch (Exception e) {
exceptionHolder.set(e);
} finally {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
index 891bbe35a4..9bde6473b1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockRecordWriter.java
@@ -18,7 +18,6 @@
package org.apache.nifi.serialization.record;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
@@ -55,7 +54,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream content) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema schema) throws SchemaNotFoundException, IOException {
return new SimpleRecordSchema(Collections.emptyList());
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index d1f0e8ac87..073a45359c 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -47,7 +47,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final String schemaIdentifier = flowFile.getAttribute(SCHEMA_ID_ATTRIBUTE);
final String schemaVersion = flowFile.getAttribute(SCHEMA_VERSION_ATTRIBUTE);
final String schemaProtocol = flowFile.getAttribute(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index de89900c28..b2e5a48234 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -45,7 +45,7 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final byte[] buffer = new byte[13];
try {
StreamUtils.fillBuffer(contentStream, buffer);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java
new file mode 100644
index 0000000000..d1ed63db8d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/InheritSchemaFromRecord.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class InheritSchemaFromRecord implements SchemaAccessStrategy {
+
+ @Override
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ if (readSchema == null) {
+ throw new SchemaNotFoundException("Cannot inherit Schema from Record because no schema was found");
+ }
+
+ return readSchema;
+ }
+
+ @Override
+ public Set getSuppliedSchemaFields() {
+ return EnumSet.allOf(SchemaField.class);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java
new file mode 100644
index 0000000000..75dedc5ff2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/NopSchemaAccessWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.schema.access;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class NopSchemaAccessWriter implements SchemaAccessWriter {
+
+ @Override
+ public void writeHeader(RecordSchema schema, OutputStream out) throws IOException {
+ }
+
+ @Override
+ public Map getAttributes(RecordSchema schema) {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void validateSchema(RecordSchema schema) throws SchemaNotFoundException {
+ }
+
+ @Override
+ public Set getRequiredSchemaFields() {
+ return EnumSet.noneOf(SchemaField.class);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
index 68f9ecfcab..923eaf0725 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessStrategy.java
@@ -30,9 +30,10 @@ public interface SchemaAccessStrategy {
*
* @param flowFile flowfile
* @param contentStream content of flowfile
+ * @param readSchema the schema that was read from the input FlowFile, or null
if there was none
* @return the RecordSchema for the FlowFile
*/
- RecordSchema getSchema(FlowFile flowFile, InputStream contentStream) throws SchemaNotFoundException, IOException;
+ RecordSchema getSchema(FlowFile flowFile, InputStream contentStream, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/**
* @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #getSchema(FlowFile, InputStream)}.
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
index d59e5daa69..796e1e4565 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
@@ -43,7 +43,7 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy {
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
final String schemaName = schemaNamePropertyValue.evaluateAttributeExpressions(flowFile).getValue();
if (schemaName.trim().isEmpty()) {
throw new SchemaNotFoundException("FlowFile did not contain appropriate attributes to determine Schema Name.");
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index ee6b1ff03a..242c917769 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -465,7 +465,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordSchema writeSchema;
try {
- writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
+ writeSchema = writerFactory.getSchema(flowFile, recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
index e48568bc4c..21b2e314cb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -59,6 +59,7 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. "
@@ -309,6 +310,8 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
@@ -323,24 +326,16 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
- final RecordSchema schema;
- final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
- try (final InputStream in = new BufferedInputStream(session.read(flowFile))) {
- schema = writerFactory.getSchema(flowFile, in);
- } catch (final Exception e) {
- getLogger().error("Failed to determine Schema for writing messages to Kafka for {}; routing to failure", new Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- continue;
- }
-
try {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn)) {
- final RecordReader reader = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class).createRecordReader(flowFile, in, getLogger());
- lease.publish(flowFile, reader, writerFactory, schema, messageKeyField, topic);
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+ final RecordSet recordSet = reader.createRecordSet();
+
+ final RecordSchema schema = writerFactory.getSchema(flowFile, recordSet.getSchema());
+ lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 4238956d8b..2004346423 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -33,7 +33,6 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
@@ -96,7 +95,7 @@ public class PublisherLease implements Closeable {
}
}
- void publish(final FlowFile flowFile, final RecordReader reader, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
+ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
final String messageKeyField, final String topic) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker();
@@ -105,7 +104,6 @@ public class PublisherLease implements Closeable {
final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
Record record;
- final RecordSet recordSet = reader.createRecordSet();
int recordCount = 0;
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, flowFile, baos)) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
index 7cff2a719c..5c59c66a90 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -42,10 +42,10 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -65,7 +65,7 @@ public class TestPublishKafkaRecord_0_10 {
public void setup() throws InitializationException, IOException {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
- Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class),
+ Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
any(RecordSchema.class), any(String.class), any(String.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
@@ -104,7 +104,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -123,7 +123,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -138,7 +138,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -155,7 +155,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_FAILURE, 3);
- verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@@ -177,7 +177,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
- verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
@@ -207,7 +207,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
- verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@@ -241,7 +241,7 @@ public class TestPublishKafkaRecord_0_10 {
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_SUCCESS, 2);
runner.assertTransferCount(PublishKafkaRecord_0_10.REL_FAILURE, 2);
- verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class), any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
index 15496263a7..60e494b50e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -18,7 +18,6 @@
package org.apache.nifi.processors.kafka.pubsub.util;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
@@ -53,7 +52,7 @@ public class MockRecordWriter extends AbstractControllerService implements Recor
}
@Override
- public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(FlowFile flowFile, RecordSchema schema) throws SchemaNotFoundException, IOException {
return null;
}
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index 934eb59637..b9d2e42a39 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -185,7 +185,7 @@ public class PutParquet extends AbstractPutHDFSRecord {
return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
}
- private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
+ private void applyCommonConfig(final ParquetWriter.Builder, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
builder.withConf(conf);
// Required properties
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
index 6ecfa59ef0..ffff2a3771 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -45,7 +45,6 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -217,7 +216,7 @@ public class FetchParquetTest {
configure(proc);
final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
- when(recordSetWriter.write(any(RecordSet.class))).thenThrow(new IOException("IOException"));
+ when(recordSetWriter.write(any(Record.class))).thenThrow(new IOException("IOException"));
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index 3a07dcee23..e634e2e450 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -16,12 +16,25 @@
*/
package org.apache.nifi.processors.parquet;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -32,7 +45,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
@@ -49,21 +61,10 @@ import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.when;
-
public class PutParquetTest {
@@ -76,6 +77,10 @@ public class PutParquetTest {
private MockRecordParser readerFactory;
private TestRunner testRunner;
+ @BeforeClass
+ public static void setupLogging() {
+ BasicConfigurator.configure();
+ }
@Before
public void setup() throws IOException, InitializationException {
@@ -108,8 +113,6 @@ public class PutParquetTest {
testRunner.enableControllerService(readerFactory);
testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
- testRunner.setProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
- testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, schema.toString());
}
@Test
@@ -325,7 +328,6 @@ public class PutParquetTest {
@Test
public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException {
configure(proc, 10);
- testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis();
@@ -339,39 +341,6 @@ public class PutParquetTest {
testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
}
- @Test
- public void testSchemaWithELMissingShouldRouteToFailure() throws InitializationException, IOException {
- configure(proc, 10);
- testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
-
- final String filename = "testSchemaWithELMissingShouldRouteToFailure-" + System.currentTimeMillis();
-
- // don't provide my.schema as an attribute
- final Map flowFileAttributes = new HashMap<>();
- flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
-
- testRunner.enqueue("trigger", flowFileAttributes);
- testRunner.run();
- testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
- }
-
- @Test
- public void testInvalidSchemaShouldRouteToFailure() throws InitializationException, IOException {
- configure(proc, 10);
- testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
-
- final String filename = "testInvalidSchemaShouldRouteToFailure-" + System.currentTimeMillis();
-
- // don't provide my.schema as an attribute
- final Map flowFileAttributes = new HashMap<>();
- flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
- flowFileAttributes.put("my.schema", "NOT A SCHEMA");
-
- testRunner.enqueue("trigger", flowFileAttributes);
- testRunner.run();
- testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
- }
-
@Test
public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
configure(proc, 10);
@@ -427,6 +396,7 @@ public class PutParquetTest {
final RecordReader recordReader = Mockito.mock(RecordReader.class);
when(recordReader.createRecordSet()).thenReturn(recordSet);
+ when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema));
final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
index 8553c32740..c0160e650a 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java
@@ -16,6 +16,15 @@
*/
package org.apache.nifi.record.script;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.Collection;
+import java.util.HashSet;
+
+import javax.script.Invocable;
+import javax.script.ScriptException;
+
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@@ -30,15 +39,6 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
-import javax.script.Invocable;
-import javax.script.ScriptException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.Collection;
-import java.util.HashSet;
-
/**
* A RecordSetWriter implementation that allows the user to script the RecordWriter instance
*/
@@ -149,14 +149,14 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
@@ -119,30 +109,31 @@ public abstract class AbstractRecordProcessor extends AbstractProcessor {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
- try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger());
- final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+ try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
- writer.beginRecordSet();
+ final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema());
+ try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, original, out)) {
+ writer.beginRecordSet();
- Record record;
- while ((record = reader.nextRecord()) != null) {
- final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
- writer.write(processed);
+ Record record;
+ while ((record = reader.nextRecord()) != null) {
+ final Record processed = AbstractRecordProcessor.this.process(record, writeSchema, original, context);
+ writer.write(processed);
+ }
+
+ final WriteResult writeResult = writer.finishRecordSet();
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCount.set(writeResult.getRecordCount());
}
-
- final WriteResult writeResult = writer.finishRecordSet();
- attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
- attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
- attributes.putAll(writeResult.getAttributes());
- recordCount.set(writeResult.getRecordCount());
-
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException("Could not parse incoming data", e);
}
}
});
} catch (final Exception e) {
- getLogger().error("Failed to process {}", new Object[] {flowFile, e});
+ getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
return;
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
index e586a8296b..6664c7ba65 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRouteRecord.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -112,15 +111,7 @@ public abstract class AbstractRouteRecord extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordSchema writeSchema;
- try (final InputStream rawIn = session.read(flowFile);
- final InputStream in = new BufferedInputStream(rawIn)) {
- writeSchema = writerFactory.getSchema(flowFile, in);
- } catch (final Exception e) {
- getLogger().error("Failed to process records for {}; will route to failure", new Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
+
final AtomicInteger numRecords = new AtomicInteger(0);
final Map> writers = new HashMap<>();
@@ -131,6 +122,8 @@ public abstract class AbstractRouteRecord extends AbstractProcessor {
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+ final RecordSchema writeSchema = writerFactory.getSchema(original, reader.getSchema());
+
Record record;
while ((record = reader.nextRecord()) != null) {
final Set relationships = route(record, writeSchema, original, context, flowFileContext);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
index e69cd72b1c..56267fecff 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PartitionRecord.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -175,15 +174,6 @@ public class PartitionRecord extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordSchema writeSchema;
- try (final InputStream rawIn = session.read(flowFile);
- final InputStream in = new BufferedInputStream(rawIn)) {
- writeSchema = writerFactory.getSchema(flowFile, in);
- } catch (final Exception e) {
- getLogger().error("Failed to partition records for {}; will route to failure", new Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- return;
- }
final Map recordPaths;
try {
@@ -203,6 +193,8 @@ public class PartitionRecord extends AbstractProcessor {
try (final InputStream in = session.read(flowFile)) {
final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+ final RecordSchema writeSchema = writerFactory.getSchema(flowFile, reader.getSchema());
+
Record record;
while ((record = reader.nextRecord()) != null) {
final Map> recordMap = new HashMap<>();
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 8a9a7714e6..f0ed52da3b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.processors.standard;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -72,9 +71,10 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.queryrecord.FlowFileTable;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
@@ -241,19 +241,19 @@ public class QueryRecord extends AbstractProcessor {
final StopWatch stopWatch = new StopWatch(true);
- final RecordSetWriterFactory resultSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY)
- .asControllerService(RecordSetWriterFactory.class);
- final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY)
- .asControllerService(RecordReaderFactory.class);
+ final RecordSetWriterFactory recordSetWriterFactory = context.getProperty(RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
+ final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
final Map transformedFlowFiles = new HashMap<>();
final Set createdFlowFiles = new HashSet<>();
+ // Determine the schema for writing the data
final RecordSchema recordSchema;
+ try (final InputStream rawIn = session.read(original)) {
+ final RecordReader reader = recordReaderFactory.createRecordReader(original, rawIn, getLogger());
+ final RecordSchema inputSchema = reader.getSchema();
- try (final InputStream rawIn = session.read(original);
- final InputStream in = new BufferedInputStream(rawIn)) {
- recordSchema = resultSetWriterFactory.getSchema(original, in);
+ recordSchema = recordSetWriterFactory.getSchema(original, inputSchema);
} catch (final Exception e) {
getLogger().error("Failed to determine Record Schema from {}; routing to failure", new Object[] {original, e});
session.transfer(original, REL_FAILURE);
@@ -281,9 +281,9 @@ public class QueryRecord extends AbstractProcessor {
final AtomicReference writeResultRef = new AtomicReference<>();
final QueryResult queryResult;
if (context.getProperty(CACHE_SCHEMA).asBoolean()) {
- queryResult = queryWithCache(session, original, sql, context, recordParserFactory);
+ queryResult = queryWithCache(session, original, sql, context, recordReaderFactory);
} else {
- queryResult = query(session, original, sql, context, recordParserFactory);
+ queryResult = query(session, original, sql, context, recordReaderFactory);
}
final AtomicReference mimeTypeRef = new AtomicReference<>();
@@ -293,7 +293,7 @@ public class QueryRecord extends AbstractProcessor {
transformed = session.write(transformed, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
- try (final RecordSetWriter resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) {
+ try (final RecordSetWriter resultSetWriter = recordSetWriterFactory.createWriter(getLogger(), recordSchema, outFlowFile, out)) {
final ResultSetRecordSet resultSet = new ResultSetRecordSet(rs);
writeResultRef.set(resultSetWriter.write(resultSet));
mimeTypeRef.set(resultSetWriter.getMimeType());
@@ -362,6 +362,7 @@ public class QueryRecord extends AbstractProcessor {
session.adjustCounter("Records Read", recordsRead, false);
}
+
private synchronized CachedStatement getStatement(final String sql, final Supplier connectionSupplier, final ProcessSession session,
final FlowFile flowFile, final RecordReaderFactory recordReaderFactory) throws SQLException {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
index 853e88d2bb..00546d2dfb 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
@@ -17,7 +17,6 @@
package org.apache.nifi.processors.standard;
-import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -132,15 +131,6 @@ public class SplitRecord extends AbstractProcessor {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
- final RecordSchema schema;
- try (final InputStream rawIn = session.read(original);
- final InputStream in = new BufferedInputStream(rawIn)) {
- schema = writerFactory.getSchema(original, in);
- } catch (final Exception e) {
- getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e});
- session.transfer(original, REL_FAILURE);
- return;
- }
final int maxRecords = context.getProperty(RECORDS_PER_SPLIT).evaluateAttributeExpressions(original).asInteger();
@@ -151,6 +141,8 @@ public class SplitRecord extends AbstractProcessor {
public void process(final InputStream in) throws IOException {
try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) {
+ final RecordSchema schema = writerFactory.getSchema(original, reader.getSchema());
+
final RecordSet recordSet = reader.createRecordSet();
final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index c6035f2e51..0443a1132a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -16,6 +16,14 @@
*/
package org.apache.nifi.processors.standard;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -38,15 +46,6 @@ import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-
public class TestQueryRecord {
static {
@@ -261,7 +260,7 @@ public class TestQueryRecord {
}
@Override
- public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final List recordFields = columnNames.stream()
.map(name -> new RecordField(name, RecordFieldType.STRING.getDataType()))
.collect(Collectors.toList());
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
index c819a6c21c..22b88050de 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java
@@ -53,11 +53,12 @@ public interface RecordSetWriterFactory extends ControllerService {
*
*
* @param flowFile the FlowFile from which the schema should be determined.
- * @param content the contents of the FlowFile from which to determine the schema
+ * @param readSchema the schema that was read from the incoming FlowFile, or null
if there is no input schema
+ *
* @return the Schema that should be used for writing Records
* @throws SchemaNotFoundException if unable to find the schema
*/
- RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException;
+ RecordSchema getSchema(FlowFile flowFile, RecordSchema readSchema) throws SchemaNotFoundException, IOException;
/**
*
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 25a115d9db..88b657cfa9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -89,7 +89,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {
return new AvroReaderWithEmbeddedSchema(in);
} else {
- final RecordSchema recordSchema = getSchema(flowFile, in);
+ final RecordSchema recordSchema = getSchema(flowFile, in, null);
final Schema avroSchema;
try {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 70da1e8e60..fd09961fe5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -65,7 +65,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
try {
final Schema avroSchema;
try {
- if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
+ if (recordSchema.getSchemaFormat().isPresent() && recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {
final Optional textOption = recordSchema.getSchemaText();
if (textOption.isPresent()) {
avroSchema = compileAvroSchema(textOption.get());
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
index eba9429d9d..d32e3e5f15 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
@@ -36,7 +36,7 @@ public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy {
private final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final DataFileStream dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader());
final Schema avroSchema = dataFileStream.getSchema();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index f2b1cbb615..fa60b2a86e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -53,7 +53,7 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
if (this.context == null) {
throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index dbea3dce1f..15280524b8 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -89,7 +89,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
// Use Mark/Reset of a BufferedInputStream in case we read from the Input Stream for the header.
final BufferedInputStream bufferedIn = new BufferedInputStream(in);
bufferedIn.mark(1024 * 1024);
- final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn));
+ final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null);
bufferedIn.reset();
return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index dcf8b5a9ac..fae3eba690 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -222,7 +222,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
private final Set schemaFields = EnumSet.noneOf(SchemaField.class);
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+ public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException {
return recordSchema;
}
@@ -235,7 +235,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
- final RecordSchema schema = getSchema(flowFile, in);
+ final RecordSchema schema = getSchema(flowFile, in, null);
return new GrokRecordReader(in, grok, schema, appendUnmatchedLine);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
index 2d11a9b6f9..45cbbd1d43 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -129,7 +129,7 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
- final RecordSchema schema = getSchema(flowFile, in);
+ final RecordSchema schema = getSchema(flowFile, in, null);
return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat);
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 1dd9834110..063c9df094 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -70,6 +70,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
- return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in), dateFormat, timeFormat, timestampFormat);
+ return new JsonTreeRowRecordReader(in, logger, getSchema(flowFile, in, null), dateFormat, timeFormat, timestampFormat);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 7b60815e97..0acf6ff951 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -17,6 +17,10 @@
package org.apache.nifi.serialization;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.INHERIT_RECORD_SCHEMA;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -34,11 +38,12 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
+import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNameAsAttribute;
import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schema.access.SchemaTextAsAttribute;
+import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
import org.apache.nifi.serialization.record.RecordSchema;
public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryService {
@@ -58,6 +63,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
"The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
+ "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
+ static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile.");
/**
* This constant is just a base spec for the actual PropertyDescriptor.
@@ -67,7 +73,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder()
.name("Schema Write Strategy")
.description("Specifies how the schema for a Record should be added to the data.")
- .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
+ .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)
.defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue())
.required(true)
.build();
@@ -76,7 +82,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
private volatile ConfigurationContext configurationContext;
private volatile SchemaAccessWriter schemaAccessWriter;
- private final List strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+ private final List schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList(
+ SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA));
+ private final List schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList(
+ SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
@Override
@@ -98,6 +107,11 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
return SCHEMA_NAME_ATTRIBUTE;
}
+ @Override
+ protected AllowableValue getDefaultSchemaAccessStrategy() {
+ return INHERIT_RECORD_SCHEMA;
+ }
+
protected PropertyDescriptor getSchemaWriteStrategyDescriptor() {
return getPropertyDescriptor(SCHEMA_WRITE_STRATEGY.getName());
}
@@ -121,7 +135,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
}
protected List getSchemaWriteStrategyValues() {
- return strategyList;
+ return schemaWriteStrategyList;
+ }
+
+ @Override
+ protected List getSchemaAccessStrategyValues() {
+ return schemaAccessStrategyList;
}
protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
@@ -132,11 +151,13 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
return new SchemaNameAsAttribute();
} else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
- return new SchemaTextAsAttribute();
+ return new WriteAvroSchemaAttributeStrategy();
} else if (allowableValue.equalsIgnoreCase(HWX_CONTENT_ENCODED_SCHEMA.getValue())) {
return new HortonworksEncodedSchemaReferenceWriter();
} else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
return new HortonworksAttributeSchemaReferenceWriter();
+ } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
+ return new NopSchemaAccessWriter();
}
return null;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index b2c683d2ed..ddcfb0ce78 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -32,6 +32,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -55,8 +56,10 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
private volatile ConfigurationContext configurationContext;
private volatile SchemaAccessStrategy schemaAccessStrategy;
+ private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
- private final List strategyList = Collections.unmodifiableList(Arrays.asList(SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
+ private final List strategyList = Collections.unmodifiableList(Arrays.asList(
+ SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA));
protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());
@@ -95,6 +98,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
this.schemaAccessStrategy = getSchemaAccessStrategy(schemaAccess, schemaRegistry, context);
}
+ @Override
protected ConfigurationContext getConfigurationContext() {
return configurationContext;
}
@@ -103,13 +107,22 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
return schemaAccessStrategy;
}
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+ public final RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
if (accessStrategy == null) {
throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
}
- return getSchemaAccessStrategy().getSchema(flowFile, contentStream);
+ return getSchemaAccessStrategy().getSchema(flowFile, contentStream, readSchema);
+ }
+
+ public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
+ if (accessStrategy == null) {
+ throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
+ }
+
+ return getSchemaAccessStrategy().getSchema(flowFile, EMPTY_INPUT_STREAM, readSchema);
}
@Override
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
index e6a9054c05..7057e4378d 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java
@@ -18,11 +18,9 @@
package org.apache.nifi.text;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -38,7 +36,6 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
-import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
@@ -46,8 +43,6 @@ import org.apache.nifi.serialization.record.RecordSchema;
+ "text is able to make use of the Expression Language to reference each of the fields that are available "
+ "in a Record. Each record in the RecordSet will be separated by a single newline character.")
public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
- private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(Collections.emptyList());
-
static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
.name("Text")
.description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.")
@@ -87,7 +82,7 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
}
@Override
- public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
- return EMPTY_SCHEMA;
+ public RecordSchema getSchema(final FlowFile flowFile, final RecordSchema readSchema) throws SchemaNotFoundException, IOException {
+ return readSchema;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
index c25f0aab33..33c0857e69 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
@@ -27,7 +27,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaTextAsAttribute;
+import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.WriteResult;
import org.junit.Assert;
@@ -36,7 +36,7 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
@Override
protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException {
- return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new SchemaTextAsAttribute(), out);
+ return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out);
}
@Override
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
index 20548db6cf..e9de978598 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVHeaderSchemaStrategy.java
@@ -56,7 +56,7 @@ public class TestCSVHeaderSchemaStrategy {
final RecordSchema schema;
try (final InputStream bais = new ByteArrayInputStream(headerBytes)) {
- schema = strategy.getSchema(null, bais);
+ schema = strategy.getSchema(null, bais, null);
}
final List expectedFieldNames = Arrays.asList("a", "b", "c", "d", "e,z", "f");