NIFI-9963: Configure JsonTreeReader whether the schema applies to the Root Node or the Nested Node

This closes #6018.

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Lehel 2022-05-04 22:14:51 +02:00 committed by Tamas Palfy
parent 2072e4ed2a
commit d925989811
10 changed files with 282 additions and 56 deletions

View File

@ -36,6 +36,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.json.JsonTreeRowRecordReader;
import org.apache.nifi.json.SchemaApplicationStrategy;
import org.apache.nifi.json.StartingFieldStrategy;
import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
import org.apache.nifi.processor.AbstractProcessor;
@ -347,7 +348,8 @@ public class QuerySalesforceObject extends AbstractProcessor {
TIME_FORMAT,
DATE_TIME_FORMAT,
StartingFieldStrategy.NESTED_FIELD,
STARTING_FIELD_NAME
STARTING_FIELD_NAME,
SchemaApplicationStrategy.SELECTED_PART
);
RecordSetWriter writer = writerFactory.createWriter(

View File

@ -222,6 +222,7 @@
<exclude>src/test/resources/json/single-bank-account.json</exclude>
<exclude>src/test/resources/json/single-bank-account-wrong-field-type.json</exclude>
<exclude>src/test/resources/json/single-element-nested-array.json</exclude>
<exclude>src/test/resources/json/single-element-deep-nested.json</exclude>
<exclude>src/test/resources/json/single-element-nested.json</exclude>
<exclude>src/test/resources/json/single-element-nested-array-middle.json</exclude>
<exclude>src/test/resources/json/nested-array-then-start-object.json</exclude>

View File

@ -64,7 +64,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
private JsonNode firstJsonNode;
private StartingFieldStrategy strategy;
private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) {
this.logger = logger;
@ -80,25 +79,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
try {
jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
JsonToken token = jsonParser.nextToken();
if (token == JsonToken.START_ARRAY) {
token = jsonParser.nextToken(); // advance to START_OBJECT token
}
if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
firstJsonNode = jsonParser.readValueAsTree();
} else {
firstJsonNode = null;
}
} catch (final JsonParseException e) {
throw new MalformedRecordException("Could not parse data as JSON", e);
}
this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
}
protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,

View File

@ -45,12 +45,16 @@ import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA;
import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
import static org.apache.nifi.schema.inference.SchemaInferenceUtil.INFER_SCHEMA;
import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
@ -70,6 +74,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
private volatile String timestampFormat;
private volatile String startingFieldName;
private volatile StartingFieldStrategy startingFieldStrategy;
private volatile SchemaApplicationStrategy schemaApplicationStrategy;
public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("starting-field-strategy")
@ -77,11 +83,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
.description("Start processing from the root node or from a specified nested node.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(StartingFieldStrategy.ROOT_NODE.name())
.allowableValues(
Arrays.stream(StartingFieldStrategy.values()).map(startingStrategy ->
new AllowableValue(startingStrategy.name(), startingStrategy.getDisplayName(), startingStrategy.getDescription())
).toArray(AllowableValue[]::new))
.defaultValue(StartingFieldStrategy.ROOT_NODE.getValue())
.allowableValues(StartingFieldStrategy.class)
.build();
@ -95,6 +98,18 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
.dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name())
.build();
public static final PropertyDescriptor SCHEMA_APPLICATION_STRATEGY = new PropertyDescriptor.Builder()
.name("schema-application-strategy")
.displayName("Schema Application Strategy")
.description("Specifies whether the schema is defined for the whole JSON or for the selected part starting from \"Starting Field Name\".")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.defaultValue(SchemaApplicationStrategy.SELECTED_PART.getValue())
.dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name())
.dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)
.allowableValues(SchemaApplicationStrategy.class)
.build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
@ -104,6 +119,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
.build());
properties.add(STARTING_FIELD_STRATEGY);
properties.add(STARTING_FIELD_NAME);
properties.add(SCHEMA_APPLICATION_STRATEGY);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
@ -117,6 +133,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue();
this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue());
}
@Override
@ -148,6 +165,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy);
}
}

View File

@ -39,26 +39,64 @@ import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Supplier;
public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
private final RecordSchema schema;
private final RecordSchema schema;
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
super(in, logger, dateFormat, timeFormat, timestampFormat);
this.schema = schema;
this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
final String dateFormat, final String timeFormat, final String timestampFormat,
final StartingFieldStrategy strategy, final String startingFieldName) throws IOException, MalformedRecordException {
super(in, logger, dateFormat, timeFormat, timestampFormat, strategy, startingFieldName);
final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
final SchemaApplicationStrategy schemaApplicationStrategy)
throws IOException, MalformedRecordException {
super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName);
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
} else {
this.schema = schema;
}
}
private RecordSchema getSelectedSchema(final RecordSchema schema, final String startingFieldName) {
final Queue<RecordSchema> schemas = new LinkedList<>();
schemas.add(schema);
while (!schemas.isEmpty()) {
final RecordSchema currentSchema = schemas.poll();
final Optional<RecordField> optionalRecordField = currentSchema.getField(startingFieldName);
if (optionalRecordField.isPresent()) {
return getChildSchemaFromField(optionalRecordField.get());
} else {
for (RecordField field : currentSchema.getFields()) {
if (field.getDataType() instanceof ArrayDataType || field.getDataType() instanceof RecordDataType) {
schemas.add(getChildSchemaFromField(field));
}
}
}
}
throw new RuntimeException(String.format("Selected schema field [%s] not found.", startingFieldName));
}
private RecordSchema getChildSchemaFromField(final RecordField recordField) {
if (recordField.getDataType() instanceof ArrayDataType) {
return ((RecordDataType) ((ArrayDataType) recordField.getDataType()).getElementType()).getChildSchema();
} else if (recordField.getDataType() instanceof RecordDataType) {
return ((RecordDataType) recordField.getDataType()).getChildSchema();
} else
throw new RuntimeException(String.format("Selected schema field [%s] is not record or array type.", recordField.getFieldName()));
}
@Override
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields)

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.json;
import org.apache.nifi.components.DescribedValue;
public enum SchemaApplicationStrategy implements DescribedValue {
WHOLE_JSON(
"Whole JSON",
"Applies the schema for the whole JSON."
),
SELECTED_PART(
"Selected Part",
"Applies the schema for the selected part starting from the \"Starting Field Name\"."
);
private final String displayName;
private final String description;
SchemaApplicationStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
@Override
public String getValue() {
return name();
}
}

View File

@ -16,9 +16,17 @@
*/
package org.apache.nifi.json;
public enum StartingFieldStrategy {
ROOT_NODE("Root Node", "Begins processing from the root node."),
NESTED_FIELD("Nested Field", "Skips forward to the given nested JSON field (array or object) to begin processing.");
import org.apache.nifi.components.DescribedValue;
public enum StartingFieldStrategy implements DescribedValue {
ROOT_NODE(
"Root Node",
"Begins processing from the root node."
),
NESTED_FIELD(
"Nested Field",
"Skips forward to the given nested JSON field (array or object) to begin processing."
);
private final String displayName;
private final String description;
@ -28,11 +36,18 @@ public enum StartingFieldStrategy {
this.description = description;
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
@Override
public String getValue() {
return name();
}
}

View File

@ -407,5 +407,12 @@
</tr>
</table>
<h2>Schema Application Strategies</h2>
<p>
When using JsonTreeReader with "Nested Field Strategy" and the "Schema Access Strategy" is not "Infer Schema",
it can be configured for the entire original JSON ("Whole JSON" strategy) or for the nested field section ("Selected part" strategy).
</p>
</body>
</html>

View File

@ -93,6 +93,13 @@ class TestJsonTreeRowRecordReader {
return new SimpleRecordSchema(accountFields);
}
private RecordSchema getSchema() {
final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final List<RecordField> fields = getDefaultFields();
fields.add(new RecordField("account", accountType));
fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
return new SimpleRecordSchema(fields);
}
@Test
void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
@ -1050,7 +1057,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartsFromNestedObject() throws IOException, MalformedRecordException {
void testStartFromNestedObject() throws IOException, MalformedRecordException {
String jsonPath = "src/test/resources/json/single-element-nested.json";
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1069,7 +1076,7 @@ class TestJsonTreeRowRecordReader {
}
@Test
void testStartsFromMultipleNestedField() throws IOException, MalformedRecordException {
void testStartFromMultipleNestedField() throws IOException, MalformedRecordException {
String jsonPath = "src/test/resources/json/multiple-nested-field.json";
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
@ -1105,7 +1112,8 @@ class TestJsonTreeRowRecordReader {
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields());
List<Object> expected = Collections.emptyList();
testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "notfound");
testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD,
"notfound", SchemaApplicationStrategy.SELECTED_PART);
}
@Test
@ -1128,29 +1136,119 @@ class TestJsonTreeRowRecordReader {
}})
);
testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "accounts");
testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD,
"accounts", SchemaApplicationStrategy.SELECTED_PART);
}
@Test
void testStartFromNestedObjectWithWholeJsonSchemaScope() throws IOException, MalformedRecordException {
String jsonPath = "src/test/resources/json/single-element-nested.json";
RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
));
RecordSchema recordSchema = new SimpleRecordSchema(Collections.singletonList(
new RecordField("account", RecordFieldType.RECORD.getRecordDataType(accountSchema))
));
RecordSchema expectedRecordSchema = accountSchema;
List<Object> expected = Collections.singletonList(
new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
put("id", 42);
put("balance", 4750.89);
}})
);
testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD,
"account", SchemaApplicationStrategy.WHOLE_JSON);
}
@Test
void testStartFromNestedArrayWithWholeJsonSchemaScope() throws IOException, MalformedRecordException {
String jsonPath = "src/test/resources/json/single-element-nested-array.json";
RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
));
RecordSchema recordSchema = new SimpleRecordSchema(Collections.singletonList(
new RecordField("accounts", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountSchema)))
));
RecordSchema expectedRecordSchema = accountSchema;
List<Object> expected = Arrays.asList(
new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
put("id", 42);
put("balance", 4750.89);
}}),
new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
put("id", 43);
put("balance", 48212.38);
}})
);
testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD,
"accounts", SchemaApplicationStrategy.WHOLE_JSON);
}
@Test
void testStartFromDeepNestedObject() throws IOException, MalformedRecordException {
String jsonPath = "src/test/resources/json/single-element-deep-nested.json";
RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("rootInt", RecordFieldType.INT.getDataType()),
new RecordField("rootString", RecordFieldType.STRING.getDataType()),
new RecordField("nestedLevel1Record", RecordFieldType.RECORD.getRecordDataType(
new SimpleRecordSchema(Arrays.asList(
new RecordField("nestedLevel1Int", RecordFieldType.INT.getDataType()),
new RecordField("nestedLevel1String", RecordFieldType.STRING.getDataType()),
new RecordField("nestedLevel2Record", RecordFieldType.RECORD.getRecordDataType(
new SimpleRecordSchema(Arrays.asList(
new RecordField("nestedLevel2Int", RecordFieldType.INT.getDataType()),
new RecordField("nestedLevel2String", RecordFieldType.STRING.getDataType())
))
))
))
))
));
SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
new RecordField("nestedLevel2Int", RecordFieldType.INT.getDataType()),
new RecordField("nestedLevel2String", RecordFieldType.STRING.getDataType())
));
List<Object> expected = Collections.singletonList(
new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
put("nestedLevel2Int", 111);
put("nestedLevel2String", "root.level1.level2:string");
}})
);
testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD,
"nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON);
}
private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
// GIVEN
final File jsonFile = new File(jsonPath);
try (
InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
) {
RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null);
// WHEN
// THEN
testReadRecords(jsonStream, schema, expected);
}
}
private void testReadRecords(String jsonPath, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException {
private void testReadRecords(String jsonPath, List<Object> expected, StartingFieldStrategy strategy,
String startingFieldName) throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
RecordSchema schema = inferSchema(jsonStream, strategy, startingFieldName);
testReadRecords(jsonStream, schema, expected, strategy, startingFieldName);
testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, SchemaApplicationStrategy.SELECTED_PART);
}
}
@ -1161,10 +1259,11 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException {
private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy,
String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected, strategy, startingFieldName);
testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy);
}
}
@ -1195,10 +1294,11 @@ class TestJsonTreeRowRecordReader {
}
}
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName)
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy,
String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy)
throws IOException, MalformedRecordException {
try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
strategy, startingFieldName)) {
strategy, startingFieldName, schemaApplicationStrategy)) {
List<Object> actual = new ArrayList<>();
Record record;

View File

@ -0,0 +1,12 @@
{
"rootInt": 100,
"rootString": "root_string",
"nestedLevel1Record": {
"nestedLevel1Int": 110,
"nestedLevel1String": "root.level1:string",
"nestedLevel2Record": {
"nestedLevel2Int": 111,
"nestedLevel2String": "root.level1.level2:string"
}
}
}