NIFI-3787: Addressed NPE and ensure that if validation fails due to RuntimeException, that it gets logged. Also clarified documentation for Json Reader services

This closes #1742.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-05-03 12:33:30 -04:00 committed by Bryan Bende
parent b7c15c360b
commit 9b177fbcba
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
15 changed files with 170 additions and 31 deletions

View File

@ -192,7 +192,7 @@ public class MapRecord implements Record {
@Override
public Date getAsDate(final String fieldName, final String format) {
return DataTypeUtils.toDate(getValue(fieldName), DataTypeUtils.getDateFormat(format), fieldName);
return DataTypeUtils.toDate(getValue(fieldName), format == null ? null : DataTypeUtils.getDateFormat(format), fieldName);
}
@Override

View File

@ -32,6 +32,7 @@ import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@ -343,6 +344,10 @@ public class DataTypeUtils {
return getDateFormat(format).format((java.util.Date) value);
}
if (value instanceof Object[]) {
return Arrays.toString((Object[]) value);
}
return value.toString();
}
@ -396,6 +401,10 @@ public class DataTypeUtils {
}
if (value instanceof String) {
if (format == null) {
return isInteger((String) value);
}
try {
getDateFormat(format).parse((String) value);
return true;
@ -407,6 +416,20 @@ public class DataTypeUtils {
return false;
}
private static boolean isInteger(final String value) {
if (value == null || value.isEmpty()) {
return false;
}
for (int i = 0; i < value.length(); i++) {
if (!Character.isDigit(value.charAt(i))) {
return false;
}
}
return true;
}
public static Time toTime(final Object value, final DateFormat format, final String fieldName) {
if (value == null) {
return null;

View File

@ -23,7 +23,9 @@ public class DateTimeUtils {
public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
.name("Date Format")
.description("Specifies the format to use when reading/writing Date fields. "
+ "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+ "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
.expressionLanguageSupported(false)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@ -32,7 +34,9 @@ public class DateTimeUtils {
public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
.name("Time Format")
.description("Specifies the format to use when reading/writing Time fields. "
+ "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+ "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+ "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
.expressionLanguageSupported(false)
.addValidator(new SimpleDateFormatValidator())
.required(false)
@ -41,7 +45,10 @@ public class DateTimeUtils {
public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.name("Timestamp Format")
.description("Specifies the format to use when reading/writing Timestamp fields. "
+ "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+ "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+ "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+ "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+ "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
.expressionLanguageSupported(false)
.addValidator(new SimpleDateFormatValidator())
.required(false)

View File

@ -30,6 +30,8 @@ import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.ArrayList;
@ -49,6 +51,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent {
private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class);
private final String id;
private final ValidationContextFactory validationContextFactory;
@ -463,6 +466,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
}
}
} catch (final Throwable t) {
logger.error("Failed to perform validation of " + this, t);
results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build());
} finally {
lock.unlock();

View File

@ -29,10 +29,14 @@ import org.apache.avro.Schema;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@ -62,6 +66,24 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
return allowableValues;
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) {
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
return new EmbeddedAvroSchemaAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ValidationContext context) {
if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
return new EmbeddedAvroSchemaAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();

View File

@ -65,8 +65,6 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
try {
final RecordSchema recordSchema = getSchema(flowFile, in);
final Schema avroSchema;
try {
if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.util.EnumSet;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaField;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy {
private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>());
final Schema avroSchema = dataFileStream.getSchema();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
return recordSchema;
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
}

View File

@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
@ -47,8 +48,16 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
this.context = context;
}
public CSVHeaderSchemaStrategy(final ValidationContext context) {
this.context = null;
}
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
if (this.context == null) {
throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
}
try {
final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader();
try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream));

View File

@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -52,8 +53,6 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
"The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
+ "column names in the header and assuming that all columns are of type String.");
private volatile SchemaAccessStrategy headerDerivedSchemaStrategy;
private volatile CSVFormat csvFormat;
private volatile String dateFormat;
private volatile String timeFormat;
@ -105,6 +104,15 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
return new CSVHeaderSchemaStrategy(context);
}
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
@Override
protected List<AllowableValue> getSchemaAccessStrategyValues() {
final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());

View File

@ -38,7 +38,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
+ "will be the column names. All subsequent lines will be the values corresponding to those columns.")
+ "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
+ "corresponding to the record fields.")
public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
private volatile CSVFormat csvFormat;

View File

@ -33,6 +33,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -180,24 +181,36 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
return new SchemaAccessStrategy() {
private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
return recordSchema;
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
};
return createAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
}
@Override
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
return createAccessStrategy();
} else {
return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
}
}
private SchemaAccessStrategy createAccessStrategy() {
return new SchemaAccessStrategy() {
private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
@Override
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
return recordSchema;
}
@Override
public Set<SchemaField> getSuppliedSchemaFields() {
return schemaFields;
}
};
}
@Override
public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {

View File

@ -62,9 +62,9 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
throws MalformedRecordException, IOException {
super(in, logger);
this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
this.schema = schema;
this.jsonPaths = jsonPaths;

View File

@ -56,9 +56,9 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
super(in, logger);
this.schema = schema;
this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
}

View File

@ -81,7 +81,8 @@
<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
property (Date Format, Time Format, Timestamp Format property).</li>
property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
</ul>
<p>

View File

@ -25,7 +25,8 @@
The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire
JSON Object tree. The Controller Service must be configured with a Schema that describes the structure
of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped.
If the schema contains a field for which no JSON field exists, a null value will be used in the Record.
If the schema contains a field for which no JSON field exists, a null value will be used in the Record
(or the default value defined in the schema, if applicable).
</p>
<p>
@ -66,7 +67,8 @@
<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
property (Date Format, Time Format, Timestamp Format property).</li>
property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
</ul>
<p>