From ce1bc42ac556e8a0847553a33b112b74bd6fc10f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 16 May 2017 14:22:19 -0400 Subject: [PATCH] NIFI-3912: This closes #1809. Fixed NPE that would result in validation failure for FreeFormTextRecordSetWriter Signed-off-by: joewitt --- .../SchemaRegistryRecordSetWriter.java | 9 +++++++++ .../serialization/SchemaRegistryService.java | 16 ++++++++++++++++ .../nifi/text/FreeFormTextRecordSetWriter.java | 13 +++++++++++++ 3 files changed, 38 insertions(+) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index 36fe6052a7..7b60815e97 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -124,6 +125,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic } protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) { + if (allowableValue == null) { + return null; + } + if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) { return new SchemaNameAsAttribute(); } else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) { @@ -140,6 +145,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic protected Set getRequiredSchemaFields(final ValidationContext validationContext) { final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue(); final SchemaAccessWriter writer = getSchemaWriteStrategy(writeStrategyValue); + if (writer == null) { + return EnumSet.noneOf(SchemaField.class); + } + final Set requiredFields = writer.getRequiredSchemaFields(); return requiredFields; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index 0697940a67..b2c683d2ed 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -38,6 +38,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -103,6 +104,11 @@ public abstract class SchemaRegistryService extends AbstractControllerService { } public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(); + if (accessStrategy == null) { + throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service"); + } + return getSchemaAccessStrategy().getSchema(flowFile, contentStream); } @@ -121,15 +127,25 @@ public abstract class SchemaRegistryService extends AbstractControllerService { final SchemaRegistry schemaRegistry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext); + if (accessStrategy == null) { + return EnumSet.noneOf(SchemaField.class); + } final Set suppliedFields = accessStrategy.getSuppliedSchemaFields(); return suppliedFields; } protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { + if (allowableValue == null) { + return null; + } + return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { + if (allowableValue == null) { + return null; + } return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java index 022804eba6..4fcc3a2a4e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java @@ -17,8 +17,11 @@ package org.apache.nifi.text; +import java.io.IOException; +import java.io.InputStream; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -27,11 +30,14 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.RecordSchema; @Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"}) @@ -39,6 +45,8 @@ import org.apache.nifi.serialization.record.RecordSchema; + "text is able to make use of the Expression Language to reference each of the fields that are available " + "in a Record. Each record in the RecordSet will be separated by a single newline character.") public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory { + private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(Collections.emptyList()); + static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder() .name("Text") .description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.") @@ -76,4 +84,9 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) { return new FreeFormTextWriter(textValue, characterSet); } + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + return EMPTY_SCHEMA; + } }