From 9b177fbcbabe67b9ebb1d0a5d3fafe7b67aebda1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 3 May 2017 12:33:30 -0400 Subject: [PATCH] NIFI-3787: Addressed NPE and ensure that if validation fails due to RuntimeException, that it gets logged. Also clarified documentation for Json Reader services This closes #1742. Signed-off-by: Bryan Bende --- .../nifi/serialization/record/MapRecord.java | 2 +- .../record/util/DataTypeUtils.java | 23 +++++++++ .../nifi/serialization/DateTimeUtils.java | 13 +++-- .../AbstractConfiguredComponent.java | 4 ++ .../java/org/apache/nifi/avro/AvroReader.java | 22 ++++++++ .../apache/nifi/avro/AvroRecordSetWriter.java | 2 - .../EmbeddedAvroSchemaAccessStrategy.java | 51 +++++++++++++++++++ .../nifi/csv/CSVHeaderSchemaStrategy.java | 9 ++++ .../java/org/apache/nifi/csv/CSVReader.java | 12 ++++- .../apache/nifi/csv/CSVRecordSetWriter.java | 3 +- .../java/org/apache/nifi/grok/GrokReader.java | 39 +++++++++----- .../nifi/json/JsonPathRowRecordReader.java | 6 +-- .../nifi/json/JsonTreeRowRecordReader.java | 6 +-- .../additionalDetails.html | 3 +- .../additionalDetails.html | 6 ++- 15 files changed, 170 insertions(+), 31 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index f9a22fc9ae..8d98c331e2 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -192,7 +192,7 @@ public class MapRecord implements Record { @Override public Date getAsDate(final String fieldName, final String format) { - return DataTypeUtils.toDate(getValue(fieldName), DataTypeUtils.getDateFormat(format), fieldName); + return DataTypeUtils.toDate(getValue(fieldName), format == null ? null : DataTypeUtils.getDateFormat(format), fieldName); } @Override diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java index f59ac0dddd..9f1e463524 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java @@ -32,6 +32,7 @@ import java.sql.Timestamp; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -343,6 +344,10 @@ public class DataTypeUtils { return getDateFormat(format).format((java.util.Date) value); } + if (value instanceof Object[]) { + return Arrays.toString((Object[]) value); + } + return value.toString(); } @@ -396,6 +401,10 @@ public class DataTypeUtils { } if (value instanceof String) { + if (format == null) { + return isInteger((String) value); + } + try { getDateFormat(format).parse((String) value); return true; @@ -407,6 +416,20 @@ public class DataTypeUtils { return false; } + private static boolean isInteger(final String value) { + if (value == null || value.isEmpty()) { + return false; + } + + for (int i = 0; i < value.length(); i++) { + if (!Character.isDigit(value.charAt(i))) { + return false; + } + } + + return true; + } + public static Time toTime(final Object value, final DateFormat format, final String fieldName) { if (value == null) { return null; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java index efc3e4e6ba..6336943ee3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java @@ -23,7 +23,9 @@ public class DateTimeUtils { public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() .name("Date Format") .description("Specifies the format to use when reading/writing Date fields. " - + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).") + + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). " + + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by " + + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).") .expressionLanguageSupported(false) .addValidator(new SimpleDateFormatValidator()) .required(false) @@ -32,7 +34,9 @@ public class DateTimeUtils { public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() .name("Time Format") .description("Specifies the format to use when reading/writing Time fields. " - + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).") + + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). " + + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by " + + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).") .expressionLanguageSupported(false) .addValidator(new SimpleDateFormatValidator()) .required(false) @@ -41,7 +45,10 @@ public class DateTimeUtils { public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() .name("Timestamp Format") .description("Specifies the format to use when reading/writing Timestamp fields. " - + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).") + + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). " + + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by " + + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by " + + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).") .expressionLanguageSupported(false) .addValidator(new SimpleDateFormatValidator()) .required(false) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java index 1156987cf1..50a71e7633 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java @@ -30,6 +30,8 @@ import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.registry.VariableRegistry; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URL; import java.util.ArrayList; @@ -49,6 +51,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent { + private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class); private final String id; private final ValidationContextFactory validationContextFactory; @@ -463,6 +466,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone } } } catch (final Throwable t) { + logger.error("Failed to perform validation of " + this, t); results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build()); } finally { lock.unlock(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index 205fec63a6..8451f5a843 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -29,10 +29,14 @@ import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaAccessUtils; import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; @@ -62,6 +66,24 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac return allowableValues; } + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) { + if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) { + return new EmbeddedAvroSchemaAccessStrategy(); + } else { + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + } + + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ValidationContext context) { + if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) { + return new EmbeddedAvroSchemaAccessStrategy(); + } else { + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + } + @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index 91ca6cfc2d..121d1ec9f5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -65,8 +65,6 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement try { final RecordSchema recordSchema = getSchema(flowFile, in); - - final Schema avroSchema; try { if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java new file mode 100644 index 0000000000..eba9429d9d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.io.IOException; +import java.io.InputStream; +import java.util.EnumSet; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.schema.access.SchemaAccessStrategy; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; + +public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy { + private final Set schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException { + final DataFileStream dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader()); + final Schema avroSchema = dataFileStream.getSchema(); + final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema); + return recordSchema; + } + + @Override + public Set getSuppliedSchemaFields() { + return schemaFields; + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java index a57f10b01c..f2b1cbb615 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.io.input.BOMInputStream; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.schema.access.SchemaAccessStrategy; @@ -47,8 +48,16 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy { this.context = context; } + public CSVHeaderSchemaStrategy(final ValidationContext context) { + this.context = null; + } + @Override public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + if (this.context == null) { + throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema"); + } + try { final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader(); try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream)); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index 9fe41360a0..dbea3dce1f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -52,8 +53,6 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact "The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the " + "column names in the header and assuming that all columns are of type String."); - private volatile SchemaAccessStrategy headerDerivedSchemaStrategy; - private volatile CSVFormat csvFormat; private volatile String dateFormat; private volatile String timeFormat; @@ -105,6 +104,15 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { + if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) { + return new CSVHeaderSchemaStrategy(context); + } + + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + @Override protected List getSchemaAccessStrategyValues() { final List allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues()); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index 15f1e1f239..0b29f09434 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -38,7 +38,8 @@ import org.apache.nifi.serialization.record.RecordSchema; @Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"}) @CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written " - + "will be the column names. All subsequent lines will be the values corresponding to those columns.") + + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values " + + "corresponding to the record fields.") public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { private volatile CSVFormat csvFormat; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java index 6c8deabe78..a874632e89 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java @@ -33,6 +33,7 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; @@ -180,24 +181,36 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac @Override protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) { if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { - return new SchemaAccessStrategy() { - private final Set schemaFields = EnumSet.noneOf(SchemaField.class); - - @Override - public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { - return recordSchema; - } - - @Override - public Set getSuppliedSchemaFields() { - return schemaFields; - } - }; + return createAccessStrategy(); } else { return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); } } + @Override + protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) { + if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) { + return createAccessStrategy(); + } else { + return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context); + } + } + + private SchemaAccessStrategy createAccessStrategy() { + return new SchemaAccessStrategy() { + private final Set schemaFields = EnumSet.noneOf(SchemaField.class); + + @Override + public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException { + return recordSchema; + } + + @Override + public Set getSuppliedSchemaFields() { + return schemaFields; + } + }; + } @Override public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java index 2eebfe27d9..7d2a2c1b18 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -62,9 +62,9 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { throws MalformedRecordException, IOException { super(in, logger); - this.dateFormat = DataTypeUtils.getDateFormat(dateFormat); - this.timeFormat = DataTypeUtils.getDateFormat(timeFormat); - this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat); + this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); this.schema = schema; this.jsonPaths = jsonPaths; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index f15e04e4cc..ee5bff91be 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -56,9 +56,9 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { super(in, logger); this.schema = schema; - this.dateFormat = DataTypeUtils.getDateFormat(dateFormat); - this.timeFormat = DataTypeUtils.getDateFormat(timeFormat); - this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat); + this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat); + this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat); + this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html index aceb54de95..14d40f6236 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html @@ -81,7 +81,8 @@
  • Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.
  • Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).
  • Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding - property (Date Format, Time Format, Timestamp Format property).
  • + property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String + representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).

    diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html index 90980d118a..c08e72041c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html @@ -25,7 +25,8 @@ The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire JSON Object tree. The Controller Service must be configured with a Schema that describes the structure of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped. - If the schema contains a field for which no JSON field exists, a null value will be used in the Record. + If the schema contains a field for which no JSON field exists, a null value will be used in the Record + (or the default value defined in the schema, if applicable).

    @@ -66,7 +67,8 @@

  • Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.
  • Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).
  • Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding - property (Date Format, Time Format, Timestamp Format property).
  • + property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String + representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).