mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 10:38:33 +00:00
NIFI-11197 Added YamlTreeReader
- Adjusted JsonTreeReader implementation for sharing common Jackson components This closes #7665 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7f7e3f0e7a
commit
4b95129f96
@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
@ -56,7 +55,7 @@ import java.util.function.Supplier;
|
||||
public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
|
||||
|
||||
static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
|
||||
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
|
||||
.name("Max String Length")
|
||||
.displayName("Max String Length")
|
||||
.description("The maximum allowed length of a string value when parsing the JSON document")
|
||||
@ -88,7 +87,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
private JsonParser jsonParser;
|
||||
private JsonNode firstJsonNode;
|
||||
private StartingFieldStrategy strategy;
|
||||
|
||||
private Map<String, String> capturedFields;
|
||||
private BiPredicate<String, String> captureFieldPredicate;
|
||||
|
||||
@ -104,28 +102,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
lazyTimestampFormat = () -> tsf;
|
||||
}
|
||||
|
||||
protected AbstractJsonRowRecordReader(final InputStream in,
|
||||
final ComponentLog logger,
|
||||
final String dateFormat,
|
||||
final String timeFormat,
|
||||
final String timestampFormat)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, false, null);
|
||||
}
|
||||
|
||||
protected AbstractJsonRowRecordReader(final InputStream in,
|
||||
final ComponentLog logger,
|
||||
final String dateFormat,
|
||||
final String timeFormat,
|
||||
final String timestampFormat,
|
||||
final boolean allowComments,
|
||||
final StreamReadConstraints streamReadConstraints)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor with initial logic for JSON to NiFi record parsing.
|
||||
*
|
||||
@ -140,7 +116,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
* be accessed by calling {@link #getCapturedFields()}
|
||||
* @param allowComments whether to allow comments within the JSON stream
|
||||
* @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints}
|
||||
*
|
||||
* @param tokenParserFactory factory to provide an instance of com.fasterxml.jackson.core.JsonParser
|
||||
* @throws IOException in case of JSON stream processing failure
|
||||
* @throws MalformedRecordException in case of malformed JSON input
|
||||
*/
|
||||
@ -153,7 +129,8 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
final String nestedFieldName,
|
||||
final BiPredicate<String, String> captureFieldPredicate,
|
||||
final boolean allowComments,
|
||||
final StreamReadConstraints streamReadConstraints)
|
||||
final StreamReadConstraints streamReadConstraints,
|
||||
final TokenParserFactory tokenParserFactory)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(logger, dateFormat, timeFormat, timestampFormat);
|
||||
@ -163,14 +140,8 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
|
||||
capturedFields = new LinkedHashMap<>();
|
||||
|
||||
try {
|
||||
final ObjectMapper codec = new ObjectMapper();
|
||||
if (allowComments) {
|
||||
codec.enable(JsonParser.Feature.ALLOW_COMMENTS);
|
||||
}
|
||||
codec.getFactory().setStreamReadConstraints(streamReadConstraints != null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS);
|
||||
|
||||
jsonParser = codec.getFactory().createParser(in);
|
||||
jsonParser.setCodec(codec);
|
||||
final StreamReadConstraints configuredStreamReadConstraints = streamReadConstraints == null ? DEFAULT_STREAM_READ_CONSTRAINTS : streamReadConstraints;
|
||||
jsonParser = tokenParserFactory.getJsonParser(in, configuredStreamReadConstraints, allowComments);
|
||||
|
||||
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
|
||||
while (jsonParser.nextToken() != null) {
|
||||
|
@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Objects;
|
||||
|
||||
public class JsonParserFactory implements TokenParserFactory {
|
||||
@Override
|
||||
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
|
||||
Objects.requireNonNull(in, "Input Stream required");
|
||||
Objects.requireNonNull(streamReadConstraints, "Stream Read Constraints required");
|
||||
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
|
||||
if (allowComments) {
|
||||
objectMapper.enable(JsonParser.Feature.ALLOW_COMMENTS);
|
||||
}
|
||||
final JsonFactory jsonFactory = objectMapper.getFactory();
|
||||
return jsonFactory.createParser(in);
|
||||
}
|
||||
}
|
@ -66,7 +66,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
|
||||
throws MalformedRecordException, IOException {
|
||||
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints);
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory());
|
||||
|
||||
this.schema = schema;
|
||||
this.jsonPaths = jsonPaths;
|
||||
|
@ -16,12 +16,11 @@
|
||||
*/
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.core.io.SerializedString;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.nifi.schema.inference.RecordSource;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -31,29 +30,28 @@ import java.io.InputStream;
|
||||
|
||||
public class JsonRecordSource implements RecordSource<JsonNode> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
|
||||
private static final JsonFactory jsonFactory;
|
||||
|
||||
private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.defaults();
|
||||
|
||||
private static final boolean ALLOW_COMMENTS_ENABLED = true;
|
||||
|
||||
private final JsonParser jsonParser;
|
||||
private final StartingFieldStrategy strategy;
|
||||
private final String startingFieldName;
|
||||
|
||||
static {
|
||||
jsonFactory = new JsonFactory();
|
||||
jsonFactory.setCodec(new ObjectMapper());
|
||||
}
|
||||
|
||||
public JsonRecordSource(final InputStream in) throws IOException {
|
||||
jsonParser = jsonFactory.createParser(in);
|
||||
strategy = null;
|
||||
startingFieldName = null;
|
||||
this(in, null, null);
|
||||
}
|
||||
|
||||
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
|
||||
jsonParser = jsonFactory.createParser(in);
|
||||
this(in , strategy, startingFieldName, new JsonParserFactory());
|
||||
}
|
||||
|
||||
public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory) throws IOException {
|
||||
jsonParser = tokenParserFactory.getJsonParser(in, DEFAULT_STREAM_READ_CONSTRAINTS, ALLOW_COMMENTS_ENABLED);
|
||||
this.strategy = strategy;
|
||||
this.startingFieldName = startingFieldName;
|
||||
|
||||
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
|
||||
final SerializedString serializedNestedField = new SerializedString(this.startingFieldName);
|
||||
final SerializedString serializedNestedField = new SerializedString(startingFieldName);
|
||||
while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken());
|
||||
logger.debug("Parsing starting at nested field [{}]", startingFieldName);
|
||||
}
|
||||
|
@ -56,16 +56,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, false, null);
|
||||
}
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat,
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null,
|
||||
allowComments, streamReadConstraints);
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
|
||||
}
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
@ -75,18 +66,18 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
|
||||
captureFieldPredicate, false, null);
|
||||
captureFieldPredicate, false, null, new JsonParserFactory());
|
||||
}
|
||||
|
||||
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat,
|
||||
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
|
||||
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate,
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints)
|
||||
final boolean allowComments, final StreamReadConstraints streamReadConstraints, final TokenParserFactory tokenParserFactory)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate,
|
||||
allowComments, streamReadConstraints);
|
||||
allowComments, streamReadConstraints, tokenParserFactory);
|
||||
|
||||
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
|
||||
this.schema = getSelectedSchema(schema, startingFieldName);
|
||||
@ -110,7 +101,6 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
throw new RuntimeException(String.format("Selected schema field [%s] not found.", startingFieldName));
|
||||
}
|
||||
|
@ -0,0 +1,36 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.json;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public interface TokenParserFactory {
|
||||
/**
|
||||
* Get JSON Parser implementation for provided Input Stream with configured settings
|
||||
*
|
||||
* @param in Input Stream to be parsed
|
||||
* @param streamReadConstraints Stream Read Constraints applied
|
||||
* @param allowComments Whether to allow comments when parsing
|
||||
* @return JSON Parser
|
||||
* @throws IOException Thrown on failures to read the Input Stream
|
||||
*/
|
||||
JsonParser getJsonParser(InputStream in, StreamReadConstraints streamReadConstraints, boolean allowComments) throws IOException;
|
||||
}
|
@ -0,0 +1,45 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>nifi-yaml-record-utils</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-json-record-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-yaml</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
|
||||
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
|
||||
import org.apache.nifi.json.TokenParserFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class YamlParserFactory implements TokenParserFactory {
|
||||
private static final YAMLFactory YAML_FACTORY = new YAMLFactory(new YAMLMapper());
|
||||
|
||||
/**
|
||||
* Get Parser implementation for YAML
|
||||
*
|
||||
* @param in Input Stream to be parsed
|
||||
* @param streamReadConstraints Stream Read Constraints are not supported in YAML
|
||||
* @param allowComments Whether to allow comments when parsing does not apply to YAML
|
||||
* @return YAML Parser
|
||||
* @throws IOException Thrown on parser creation failures
|
||||
*/
|
||||
@Override
|
||||
public JsonParser getJsonParser(final InputStream in, final StreamReadConstraints streamReadConstraints, final boolean allowComments) throws IOException {
|
||||
return YAML_FACTORY.createParser(in);
|
||||
}
|
||||
}
|
@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import org.apache.nifi.json.JsonRecordSource;
|
||||
import org.apache.nifi.json.StartingFieldStrategy;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
public class YamlRecordSource extends JsonRecordSource {
|
||||
public YamlRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
|
||||
super(in, strategy, startingFieldName, new YamlParserFactory());
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import org.apache.nifi.json.JsonTreeRowRecordReader;
|
||||
import org.apache.nifi.json.SchemaApplicationStrategy;
|
||||
import org.apache.nifi.json.StartingFieldStrategy;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.function.BiPredicate;
|
||||
|
||||
public class YamlTreeRowRecordReader extends JsonTreeRowRecordReader {
|
||||
|
||||
public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
|
||||
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
|
||||
}
|
||||
|
||||
public YamlTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
|
||||
final String dateFormat, final String timeFormat, final String timestampFormat,
|
||||
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
|
||||
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate)
|
||||
throws IOException, MalformedRecordException {
|
||||
|
||||
super(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
|
||||
captureFieldPredicate, true, null, new YamlParserFactory());
|
||||
}
|
||||
}
|
@ -30,6 +30,7 @@
|
||||
<module>nifi-json-record-utils</module>
|
||||
<module>nifi-mock-record-utils</module>
|
||||
<module>nifi-schema-inference-utils</module>
|
||||
<module>nifi-yaml-record-utils</module>
|
||||
</modules>
|
||||
|
||||
<dependencyManagement>
|
||||
|
@ -68,6 +68,11 @@
|
||||
<artifactId>nifi-json-record-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-yaml-record-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-csv</artifactId>
|
||||
@ -270,6 +275,7 @@
|
||||
<exclude>src/test/resources/xml/testschema</exclude>
|
||||
<exclude>src/test/resources/xml/testschema2</exclude>
|
||||
<exclude>src/test/resources/xml/testschema3</exclude>
|
||||
<exclude>src/test/resources/yaml/*.yaml</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
@ -70,12 +70,12 @@ import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
|
||||
+ "See the Usage of the Controller Service for more information and examples.")
|
||||
@SeeAlso(JsonPathReader.class)
|
||||
public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
|
||||
private volatile String dateFormat;
|
||||
private volatile String timeFormat;
|
||||
private volatile String timestampFormat;
|
||||
private volatile String startingFieldName;
|
||||
private volatile StartingFieldStrategy startingFieldStrategy;
|
||||
private volatile SchemaApplicationStrategy schemaApplicationStrategy;
|
||||
protected volatile String dateFormat;
|
||||
protected volatile String timeFormat;
|
||||
protected volatile String timestampFormat;
|
||||
protected volatile String startingFieldName;
|
||||
protected volatile StartingFieldStrategy startingFieldStrategy;
|
||||
protected volatile SchemaApplicationStrategy schemaApplicationStrategy;
|
||||
private volatile boolean allowComments;
|
||||
private volatile StreamReadConstraints streamReadConstraints;
|
||||
|
||||
@ -138,9 +138,29 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
||||
this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
|
||||
this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue();
|
||||
this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue());
|
||||
this.streamReadConstraints = buildStreamReadConstraints(context);
|
||||
this.allowComments = isAllowCommentsEnabled(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Build Stream Read Constraints based on available properties
|
||||
*
|
||||
* @param context Configuration Context with property values
|
||||
* @return Stream Read Constraints
|
||||
*/
|
||||
protected StreamReadConstraints buildStreamReadConstraints(final ConfigurationContext context) {
|
||||
final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
|
||||
this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
|
||||
this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
|
||||
return StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether to allow comments when parsing based on available properties
|
||||
*
|
||||
* @param context Configuration Context with property values
|
||||
* @return Allow comments status
|
||||
*/
|
||||
protected boolean isAllowCommentsEnabled(final ConfigurationContext context) {
|
||||
return context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -153,9 +173,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
||||
|
||||
@Override
|
||||
protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccessStrategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
|
||||
final RecordSourceFactory<JsonNode> jsonSourceFactory =
|
||||
(var, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
|
||||
|
||||
final RecordSourceFactory<JsonNode> jsonSourceFactory = createJsonRecordSourceFactory();
|
||||
final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier =
|
||||
() -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
|
||||
|
||||
@ -163,16 +181,23 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
|
||||
() -> super.getSchemaAccessStrategy(schemaAccessStrategy, schemaRegistry, context));
|
||||
}
|
||||
|
||||
protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
|
||||
return (variables, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AllowableValue getDefaultSchemaAccessStrategy() {
|
||||
return INFER_SCHEMA;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
|
||||
throws IOException, MalformedRecordException, SchemaNotFoundException {
|
||||
final RecordSchema schema = getSchema(variables, in, null);
|
||||
return createJsonTreeRowRecordReader(in, logger, schema);
|
||||
}
|
||||
|
||||
protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema) throws IOException, MalformedRecordException {
|
||||
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
|
||||
schemaApplicationStrategy, null, allowComments, streamReadConstraints);
|
||||
schemaApplicationStrategy, null, allowComments, streamReadConstraints, new JsonParserFactory());
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.yaml;
|
||||
|
||||
import com.fasterxml.jackson.core.StreamReadConstraints;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.json.AbstractJsonRowRecordReader;
|
||||
import org.apache.nifi.json.JsonTreeReader;
|
||||
import org.apache.nifi.json.JsonTreeRowRecordReader;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.schema.inference.RecordSourceFactory;
|
||||
import org.apache.nifi.serialization.MalformedRecordException;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Tags({"yaml", "tree", "record", "reader", "parser"})
|
||||
@CapabilityDescription("Parses YAML into individual Record objects. While the reader expects each record "
|
||||
+ "to be well-formed YAML, the content of a FlowFile may consist of many records, each as a well-formed "
|
||||
+ "YAML array or YAML object. "
|
||||
+ "If an array is encountered, each element in that array will be treated as a separate record. "
|
||||
+ "If the schema that is configured contains a field that is not present in the YAML, a null value will be used. If the YAML contains "
|
||||
+ "a field that is not present in the schema, that field will be skipped. "
|
||||
+ "See the Usage of the Controller Service for more information and examples.")
|
||||
public class YamlTreeReader extends JsonTreeReader {
|
||||
|
||||
private static final boolean ALLOW_COMMENTS_DISABLED = false;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
|
||||
// Remove those properties which are not applicable for YAML
|
||||
properties.remove(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
|
||||
properties.remove(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected RecordSourceFactory<JsonNode> createJsonRecordSourceFactory() {
|
||||
return (var, in) -> new YamlRecordSource(in, startingFieldStrategy, startingFieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonTreeRowRecordReader createJsonTreeRowRecordReader(InputStream in, ComponentLog logger, RecordSchema schema) throws IOException, MalformedRecordException {
|
||||
return new YamlTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
|
||||
schemaApplicationStrategy, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StreamReadConstraints buildStreamReadConstraints(final ConfigurationContext context) {
|
||||
return StreamReadConstraints.defaults();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean isAllowCommentsEnabled(final ConfigurationContext context) {
|
||||
return ALLOW_COMMENTS_DISABLED;
|
||||
}
|
||||
}
|
@ -38,3 +38,4 @@ org.apache.nifi.xml.XMLReader
|
||||
org.apache.nifi.xml.XMLRecordSetWriter
|
||||
org.apache.nifi.windowsevent.WindowsEventLogReader
|
||||
org.apache.nifi.schema.inference.VolatileSchemaCache
|
||||
org.apache.nifi.yaml.YamlTreeReader
|
||||
|
@ -411,7 +411,7 @@
|
||||
|
||||
<p>
|
||||
When using JsonTreeReader with "Nested Field Strategy" and the "Schema Access Strategy" is not "Infer Schema",
|
||||
it can be configured for the entire original JSON ("Whole JSON" strategy) or for the nested field section ("Selected part" strategy).
|
||||
it can be configured for the entire original JSON ("Whole document" strategy) or for the nested field section ("Selected part" strategy).
|
||||
</p>
|
||||
|
||||
</body>
|
||||
|
@ -0,0 +1,454 @@
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<head>
|
||||
<meta charset="utf-8"/>
|
||||
<title>YamlTreeReader</title>
|
||||
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css"/>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<p>
|
||||
The YamlTreeReader Controller Service reads a YAML Object and creates a Record object either for the
|
||||
entire YAML Object tree or a subpart (see "Starting Field Strategies" section). The Controller Service
|
||||
must be configured with a Schema that describes the structure of the YAML data. If any field exists in
|
||||
the YAML that is not in the schema, that field will be skipped. If the schema contains a field for which
|
||||
no YAML field exists, a null value will be used in the Record (or the default value defined in the schema,
|
||||
if applicable).
|
||||
</p>
|
||||
|
||||
<p>
|
||||
If the root element of the YAML is a YAML Array, each YAML Object within that array will be treated as
|
||||
its own separate Record. If the root element is a YAML Object, the YAML will all be treated as a single
|
||||
Record.
|
||||
</p>
|
||||
|
||||
|
||||
<h2>Schemas and Type Coercion</h2>
|
||||
|
||||
<p>
|
||||
When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
|
||||
configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
|
||||
the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
|
||||
is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
|
||||
schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
|
||||
into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The following rules apply when attempting to coerce a field value from one data type to another:
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li>Any data type can be coerced into a String type.</li>
|
||||
<li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
|
||||
<li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
|
||||
milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
|
||||
<li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
|
||||
or "Timestamp Format."</li>
|
||||
<li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
|
||||
<code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
|
||||
type but not an Integer.</li>
|
||||
<li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
|
||||
<li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
|
||||
and the rest of the characters are ignored.</li>
|
||||
<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
|
||||
<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
|
||||
<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
|
||||
property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
|
||||
representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
|
||||
will be thrown.
|
||||
</p>
|
||||
|
||||
|
||||
|
||||
<h2>Schema Inference</h2>
|
||||
|
||||
<p>
|
||||
While NiFi's Record API does require that each Record have a schema, it is often convenient to infer the schema based on the values in the data,
|
||||
rather than having to manually create a schema. This is accomplished by selecting a value of "Infer Schema" for the "Schema Access Strategy" property.
|
||||
When using this strategy, the Reader will determine the schema by first parsing all data in the FlowFile, keeping track of all fields that it has encountered
|
||||
and the type of each field. Once all data has been parsed, a schema is formed that encompasses all fields that have been encountered.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
A common concern when inferring schemas is how to handle the condition of two values that have different types. For example, consider a FlowFile with the following two records:
|
||||
</p>
|
||||
<code><pre>
|
||||
[{
|
||||
"name": "John",
|
||||
"age": 8,
|
||||
"values": "N/A"
|
||||
}, {
|
||||
"name": "Jane",
|
||||
"age": "Ten",
|
||||
"values": [ 8, "Ten" ]
|
||||
}]
|
||||
</pre></code>
|
||||
|
||||
<p>
|
||||
It is clear that the "name" field will be inferred as a STRING type. However, how should we handle the "age" field? Should the field be an CHOICE between INT and STRING? Should we
|
||||
prefer LONG over INT? Should we just use a STRING? Should the field be considered nullable?
|
||||
</p>
|
||||
|
||||
<p>
|
||||
To help understand how this Record Reader infers schemas, we have the following list of rules that are followed in the inference logic:
|
||||
</p>
|
||||
|
||||
<ul>
|
||||
<li>All fields are inferred to be nullable.</li>
|
||||
<li>
|
||||
When two values are encountered for the same field in two different records (or two values are encountered for an ARRAY type), the inference engine prefers
|
||||
to use a "wider" data type over using a CHOICE data type. A data type "A" is said to be wider than data type "B" if and only if data type "A" encompasses all
|
||||
values of "B" in addition to other values. For example, the LONG type is wider than the INT type but not wider than the BOOLEAN type (and BOOLEAN is also not wider
|
||||
than LONG). INT is wider than SHORT. The STRING type is considered wider than all other types with the Exception of MAP, RECORD, ARRAY, and CHOICE.
|
||||
</li>
|
||||
<li>
|
||||
If two values are encountered for the same field in two different records (or two values are encountered for an ARRAY type), but neither value is of a type that
|
||||
is wider than the other, then a CHOICE type is used. In the example above, the "values" field will be inferred as a CHOICE between a STRING or an ARRRAY<STRING>.
|
||||
</li>
|
||||
<li>
|
||||
If the "Time Format," "Timestamp Format," or "Date Format" properties are configured, any value that would otherwise be considered a STRING type is first checked against
|
||||
the configured formats to see if it matches any of them. If the value matches the Timestamp Format, the value is considered a Timestamp field. If it matches the Date Format,
|
||||
it is considered a Date field. If it matches the Time Format, it is considered a Time field. In the unlikely event that the value matches more than one of the configured
|
||||
formats, they will be matched in the order: Timestamp, Date, Time. I.e., if a value matched both the Timestamp Format and the Date Format, the type that is inferred will be
|
||||
Timestamp. Because parsing dates and times can be expensive, it is advisable not to configure these formats if dates, times, and timestamps are not expected, or if processing
|
||||
the data as a STRING is acceptable. For use cases when this is important, though, the inference engine is intelligent enough to optimize the parsing by first checking several
|
||||
very cheap conditions. For example, the string's length is examined to see if it is too long or too short to match the pattern. This results in far more efficient processing
|
||||
than would result if attempting to parse each string value as a timestamp.
|
||||
</li>
|
||||
<li>The MAP type is never inferred. Instead, the RECORD type is used.</li>
|
||||
<li>If a field exists but all values are null, then the field is inferred to be of type STRING.</li>
|
||||
</ul>
|
||||
|
||||
|
||||
|
||||
<h2>Caching of Inferred Schemas</h2>
|
||||
|
||||
<p>
|
||||
This Record Reader requires that if a schema is to be inferred, that all records be read in order to ensure that the schema that gets inferred is applicable for all
|
||||
records in the FlowFile. However, this can become expensive, especially if the data undergoes many different transformations. To alleviate the cost of inferring schemas,
|
||||
the Record Reader can be configured with a "Schema Inference Cache" by populating the property with that name. This is a Controller Service that can be shared by Record
|
||||
Readers and Record Writers.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Whenever a Record Writer is used to write data, if it is configured with a "Schema Cache," it will also add the schema to the Schema Cache. This will result in an
|
||||
identifier for that schema being added as an attribute to the FlowFile.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
Whenever a Record Reader is used to read data, if it is configured with a "Schema Inference Cache", it will first look for a "schema.cache.identifier" attribute on the FlowFile.
|
||||
If the attribute exists, it will use the value of that attribute to lookup the schema in the schema cache. If it is able to find a schema in the cache with that identifier,
|
||||
then it will use that schema instead of reading, parsing, and analyzing the data to infer the schema. If the attribute is not available on the FlowFile, or if the attribute is
|
||||
available but the cache does not have a schema with that identifier, then the Record Reader will proceed to infer the schema as described above.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The end result is that users are able to chain together many different Processors to operate on Record-oriented data. Typically, only the first such Processor in the chain will
|
||||
incur the "penalty" of inferring the schema. For all other Processors in the chain, the Record Reader is able to simply lookup the schema in the Schema Cache by identifier.
|
||||
This allows the Record Reader to infer a schema accurately, since it is inferred based on all data in the FlowFile, and still allows this to happen efficiently since the schema
|
||||
will typically only be inferred once, regardless of how many Processors handle the data.
|
||||
</p>
|
||||
|
||||
|
||||
<h2>Starting Field Strategies</h2>
|
||||
|
||||
<p>
|
||||
When using YamlTreeReader, two different starting field strategies can be selected. With the default Root Node strategy, the YamlTreeReader begins processing from the root element
|
||||
of the YAML and creates a Record object for the entire YAML Object tree, while the Nested Field strategy defines a nested field from which to begin processing.
|
||||
</p>
|
||||
<p>
|
||||
Using the Nested Field strategy, a schema corresponding to the nested YAML part should be specified. In case of schema inference, the YamlTreeReader will automatically
|
||||
infer a schema from nested records.
|
||||
</p>
|
||||
|
||||
<h3>Root Node Strategy</h3>
|
||||
|
||||
<p>
|
||||
Consider the following YAML is read with the default Root Node strategy:
|
||||
</p>
|
||||
<code>
|
||||
<pre>
|
||||
- id: 17
|
||||
name: John
|
||||
child:
|
||||
id: "1"
|
||||
dob: 10-29-1982
|
||||
siblings:
|
||||
- name: Jeremy
|
||||
id: 4
|
||||
- name: Julia
|
||||
id: 8
|
||||
- id: 98
|
||||
name: Jane
|
||||
child:
|
||||
id: 2
|
||||
dob: 08-30-1984
|
||||
gender: F
|
||||
siblingIds: []
|
||||
siblings: []
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
Also, consider that the schema that is configured for this YAML is as follows (assuming that the AvroSchemaRegistry
|
||||
Controller Service is chosen to denote the Schema):
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
{
|
||||
"type": "record",
|
||||
"name": "nifiRecord",
|
||||
"namespace": "org.apache.nifi",
|
||||
"fields": [
|
||||
{
|
||||
"name": "id",
|
||||
"type": ["int","null"]
|
||||
},
|
||||
{
|
||||
"name": "name",
|
||||
"type": ["string","null"]
|
||||
},
|
||||
{
|
||||
"name": "child",
|
||||
"type": [
|
||||
{
|
||||
"type": "record",
|
||||
"name": "childType",
|
||||
"fields": [
|
||||
{
|
||||
"name": "id",
|
||||
"type": ["int","string","null"]
|
||||
}
|
||||
]
|
||||
},
|
||||
"null"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "dob",
|
||||
"type": ["string","null"]
|
||||
},
|
||||
{
|
||||
"name": "siblings",
|
||||
"type": [
|
||||
{
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "record",
|
||||
"name": "siblingsType",
|
||||
"fields": [
|
||||
{
|
||||
"name": "name",
|
||||
"type": ["string","null"]
|
||||
},
|
||||
{
|
||||
"name": "id",
|
||||
"type": ["int","null"]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"null"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "gender",
|
||||
"type": ["string","null"]
|
||||
},
|
||||
{
|
||||
"name": "siblingIds",
|
||||
"type": [
|
||||
{
|
||||
"type": "array",
|
||||
"items": "string"
|
||||
},
|
||||
"null"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
Let us also assume that this Controller Service is configured with the "Date Format" property set to "MM-dd-yyyy", as this
|
||||
matches the date format used for our YAML data. This will result in the YAML creating two separate records, because the root
|
||||
element is a YAML array with two elements.
|
||||
</p>
|
||||
|
||||
<p>
|
||||
The first Record will consist of the following values:
|
||||
</p>
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th>Field Name</th>
|
||||
<th>Field Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>id</td>
|
||||
<td>17</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>name</td>
|
||||
<td>John</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>gender</td>
|
||||
<td><i>null</i></td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>dob</td>
|
||||
<td>11-30-1983</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>siblings</td>
|
||||
<td>
|
||||
<i>array with two elements, each of which is itself a Record:</i>
|
||||
<br />
|
||||
<table>
|
||||
<tr>
|
||||
<th>Field Name</th>
|
||||
<th>Field Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>name</td>
|
||||
<td>Jeremy</td>
|
||||
</tr>
|
||||
</table>
|
||||
<br />
|
||||
<i>and:</i>
|
||||
<br />
|
||||
<table>
|
||||
<tr>
|
||||
<th>Field Name</th>
|
||||
<th>Field Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>name</td>
|
||||
<td>Julia</td>
|
||||
</tr>
|
||||
</table>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<p>
|
||||
The second Record will consist of the following values:
|
||||
</p>
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th>Field Name</th>
|
||||
<th>Field Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>id</td>
|
||||
<td>98</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>name</td>
|
||||
<td>Jane</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>gender</td>
|
||||
<td>F</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>dob</td>
|
||||
<td>08-30-1984</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>siblings</td>
|
||||
<td><i>empty array</i></td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<h3>Nested Field Strategy</h3>
|
||||
|
||||
<p>
|
||||
Using the Nested Field strategy, consider the same YAML where the specified Starting Field Name is
|
||||
"siblings". The schema that is configured for this YAML is as follows:
|
||||
</p>
|
||||
|
||||
<code>
|
||||
<pre>
|
||||
{
|
||||
"namespace": "nifi",
|
||||
"name": "siblings",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "name", "type": "string" },
|
||||
{ "name": "id", "type": "int" }
|
||||
]
|
||||
}
|
||||
</pre>
|
||||
</code>
|
||||
|
||||
<p>
|
||||
The first Record will consist of the following values:
|
||||
</p>
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th>Field Name</th>
|
||||
<th>Field Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>name</td>
|
||||
<td>Jeremy</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>id</td>
|
||||
<td>4</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<p>
|
||||
The second Record will consist of the following values:
|
||||
</p>
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th>Field Name</th>
|
||||
<th>Field Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>name</td>
|
||||
<td>Julia</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>id</td>
|
||||
<td>8</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<h2>Schema Application Strategies</h2>
|
||||
|
||||
<p>
|
||||
When using YamlTreeReader with "Nested Field Strategy" and the "Schema Access Strategy" is not "Infer Schema",
|
||||
it can be configured for the entire original YAML ("Whole document" strategy) or for the nested field section ("Selected part" strategy).
|
||||
</p>
|
||||
|
||||
</body>
|
||||
</html>
|
@ -326,7 +326,8 @@ class TestJsonTreeRowRecordReader {
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
|
||||
try (final InputStream in = new FileInputStream(inputFile);
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
|
||||
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
|
||||
null, null, null, null, allowComments, streamReadConstraints, new JsonParserFactory())) {
|
||||
|
||||
final List<String> fieldNames = schema.getFieldNames();
|
||||
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,24 @@
|
||||
- id: 1
|
||||
name: John Doe
|
||||
balance: 4750.89
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
- id: 2
|
||||
name: Jane Doe
|
||||
balance: 4820.09
|
||||
address: 321 Your Street
|
||||
city: Your City
|
||||
state: NY
|
||||
zipCode: "33333"
|
||||
- id: 3
|
||||
name: Jake Doe
|
||||
balance: 4751.89
|
||||
address: 124 My Street
|
||||
address2: 'Apt. #12'
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
@ -0,0 +1,23 @@
|
||||
- id: 1
|
||||
name: John Doe
|
||||
balance: 4750.89
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
- id: 2
|
||||
name: Jane Doe
|
||||
balance: null
|
||||
address: 321 Your Street
|
||||
city: Your City
|
||||
state: NY
|
||||
zipCode: "33333"
|
||||
country: USA
|
||||
- id: 3
|
||||
name: Jimmy Doe
|
||||
address: 321 Your Street
|
||||
city: Your City
|
||||
state: NY
|
||||
zipCode: "33333"
|
||||
country: USA
|
@ -0,0 +1,16 @@
|
||||
- id: 1
|
||||
name: John Doe
|
||||
balance: 4750.89
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
- id: 2
|
||||
name: Jane Doe
|
||||
balance: 4820.09
|
||||
address: 321 Your Street
|
||||
city: Your City
|
||||
state: NY
|
||||
zipCode: "33333"
|
||||
country: USA
|
@ -0,0 +1,20 @@
|
||||
# comment before objects
|
||||
- id: 1
|
||||
name: John Doe
|
||||
# comment in object
|
||||
balance: 4750.89
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
# comment between objects
|
||||
- id: 2
|
||||
name: Jane Doe
|
||||
balance: 4820.09
|
||||
address: 321 Your Street
|
||||
city: Your City
|
||||
state: NY
|
||||
zipCode: "33333"
|
||||
country: USA
|
||||
# Comment after objects
|
@ -0,0 +1,15 @@
|
||||
id: 1
|
||||
accounts:
|
||||
- id: 42
|
||||
balance: 4750.89
|
||||
- id: 43
|
||||
balance: 48212.38
|
||||
name: John Doe
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
job:
|
||||
salary: 115431
|
||||
position: acountant
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
@ -0,0 +1,15 @@
|
||||
dataCollection:
|
||||
- record:
|
||||
- integer: 11
|
||||
boolean: true
|
||||
extraString: extraStringValue11
|
||||
- integer: 12
|
||||
boolean: false
|
||||
extraString: extraStringValue12
|
||||
- record:
|
||||
- integer: 21
|
||||
string: stringValue21
|
||||
extraString: extraStringValue21
|
||||
- integer: 22
|
||||
string: stringValue22
|
||||
extraString: extraStringValue22
|
@ -0,0 +1,16 @@
|
||||
dataCollection:
|
||||
- record:
|
||||
integer: 1
|
||||
- record:
|
||||
- integer: 21
|
||||
boolean: true
|
||||
- integer: 22
|
||||
boolean: false
|
||||
- record:
|
||||
integer: 3
|
||||
string: stringValue3
|
||||
- record:
|
||||
- integer: 41
|
||||
string: stringValue41
|
||||
- integer: 42
|
||||
string: stringValue42
|
@ -0,0 +1,7 @@
|
||||
dataCollection:
|
||||
- record:
|
||||
integer: 1
|
||||
boolean: true
|
||||
- record:
|
||||
integer: 2
|
||||
string: stringValue2
|
@ -0,0 +1,19 @@
|
||||
dataCollection:
|
||||
- record:
|
||||
integer: 1
|
||||
boolean: false
|
||||
- record:
|
||||
- integer: 21
|
||||
boolean: true
|
||||
- integer: 22
|
||||
boolean: false
|
||||
- record:
|
||||
integer: 3
|
||||
string: stringValue3
|
||||
- record:
|
||||
- integer: 41
|
||||
string: stringValue41
|
||||
- integer: 42
|
||||
string: stringValue42
|
||||
- integer: 43
|
||||
boolean: false
|
@ -0,0 +1,4 @@
|
||||
fields:
|
||||
- type: string
|
||||
- type:
|
||||
- type: string
|
@ -0,0 +1,6 @@
|
||||
- id: "1234"
|
||||
child:
|
||||
id: "4321"
|
||||
- id: "1234"
|
||||
child:
|
||||
name: child
|
@ -0,0 +1,13 @@
|
||||
- id: 1
|
||||
name: John Doe
|
||||
balance: 4750.89
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
accountIds:
|
||||
- id: n312kj3
|
||||
type: employee
|
||||
- id: dl2kdff
|
||||
type: security
|
@ -0,0 +1,9 @@
|
||||
- id: 17
|
||||
name: John
|
||||
accounts:
|
||||
- id: 42
|
||||
balance: 4750.89
|
||||
- id: 43
|
||||
balance: 48212.38
|
||||
- id: 98
|
||||
balance: 67829.12
|
@ -0,0 +1,7 @@
|
||||
dataCollection:
|
||||
- integer: 1
|
||||
boolean: true
|
||||
booleanOrString: true
|
||||
- integer: 2
|
||||
string: stringValue2
|
||||
booleanOrString: booleanOrStringValue2
|
@ -0,0 +1,10 @@
|
||||
id: 1
|
||||
name: John Doe
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
account:
|
||||
id: 42
|
||||
balance: true
|
@ -0,0 +1,8 @@
|
||||
id: 1
|
||||
name: John Doe
|
||||
balance: 4750.89
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
@ -0,0 +1,8 @@
|
||||
rootInt: 100
|
||||
rootString: root_string
|
||||
nestedLevel1Record:
|
||||
nestedLevel1Int: 110
|
||||
nestedLevel1String: root.level1:string
|
||||
nestedLevel2Record:
|
||||
nestedLevel2Int: 111
|
||||
nestedLevel2String: root.level1.level2:string
|
@ -0,0 +1,12 @@
|
||||
id: 1
|
||||
name: John Doe
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
accounts:
|
||||
- id: 42
|
||||
balance: 4750.89
|
||||
- id: 43
|
||||
balance: 48212.38
|
@ -0,0 +1,10 @@
|
||||
id: 1
|
||||
name: John Doe
|
||||
address: 123 My Street
|
||||
city: My City
|
||||
state: MS
|
||||
zipCode: "11111"
|
||||
country: USA
|
||||
account:
|
||||
id: 42
|
||||
balance: 4750.89
|
@ -0,0 +1,2 @@
|
||||
timestamp: 2019/06/27 13:04:04
|
||||
field_not_in_schema: some_value
|
@ -0,0 +1,6 @@
|
||||
created_at: Thu Feb 16 01:19:42 +0000 2017
|
||||
id: 832036744985577473
|
||||
unicode: ちゃ泣きそう
|
||||
from:
|
||||
id: 788946702264507903
|
||||
name: john
|
Loading…
x
Reference in New Issue
Block a user