From d92598981124e1b73b958c54b9d4dd9b47f0b298 Mon Sep 17 00:00:00 2001 From: Lehel Date: Wed, 4 May 2022 22:14:51 +0200 Subject: [PATCH] NIFI-9963: Configure JsonTreeReader whether the schema applies to the Root Node or the Nested Node This closes #6018. Signed-off-by: Tamas Palfy --- .../salesforce/QuerySalesforceObject.java | 4 +- .../pom.xml | 1 + .../json/AbstractJsonRowRecordReader.java | 21 +-- .../org/apache/nifi/json/JsonTreeReader.java | 31 ++++- .../nifi/json/JsonTreeRowRecordReader.java | 58 ++++++-- .../nifi/json/SchemaApplicationStrategy.java | 53 +++++++ .../nifi/json/StartingFieldStrategy.java | 21 ++- .../additionalDetails.html | 7 + .../json/TestJsonTreeRowRecordReader.java | 130 ++++++++++++++++-- .../json/single-element-deep-nested.json | 12 ++ 10 files changed, 282 insertions(+), 56 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/SchemaApplicationStrategy.java create mode 100644 nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-deep-nested.json diff --git a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java index c6cd3d25f9..d766d86023 100644 --- a/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java +++ b/nifi-nar-bundles/nifi-salesforce-bundle/nifi-salesforce-processors/src/main/java/org/apache/nifi/processors/salesforce/QuerySalesforceObject.java @@ -36,6 +36,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.json.JsonTreeRowRecordReader; +import org.apache.nifi.json.SchemaApplicationStrategy; import org.apache.nifi.json.StartingFieldStrategy; import org.apache.nifi.oauth2.OAuth2AccessTokenProvider; import org.apache.nifi.processor.AbstractProcessor; @@ -347,7 +348,8 @@ public class QuerySalesforceObject extends AbstractProcessor { TIME_FORMAT, DATE_TIME_FORMAT, StartingFieldStrategy.NESTED_FIELD, - STARTING_FIELD_NAME + STARTING_FIELD_NAME, + SchemaApplicationStrategy.SELECTED_PART ); RecordSetWriter writer = writerFactory.createWriter( diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index a04d291386..c840771427 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -222,6 +222,7 @@ src/test/resources/json/single-bank-account.json src/test/resources/json/single-bank-account-wrong-field-type.json src/test/resources/json/single-element-nested-array.json + src/test/resources/json/single-element-deep-nested.json src/test/resources/json/single-element-nested.json src/test/resources/json/single-element-nested-array-middle.json src/test/resources/json/nested-array-then-start-object.json diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java index e248bfd518..a3515d03db 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -64,7 +64,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { private JsonNode firstJsonNode; private StartingFieldStrategy strategy; - private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) { this.logger = logger; @@ -80,25 +79,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader { protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { - this(logger, dateFormat, timeFormat, timestampFormat); - - try { - jsonParser = jsonFactory.createParser(in); - jsonParser.setCodec(codec); - - JsonToken token = jsonParser.nextToken(); - if (token == JsonToken.START_ARRAY) { - token = jsonParser.nextToken(); // advance to START_OBJECT token - } - - if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also - firstJsonNode = jsonParser.readValueAsTree(); - } else { - firstJsonNode = null; - } - } catch (final JsonParseException e) { - throw new MalformedRecordException("Could not parse data as JSON", e); - } + this(in, logger, dateFormat, timeFormat, timestampFormat, null, null); } protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat, diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java index e5a2ed20fd..5de7d74c7d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -45,12 +45,16 @@ import org.apache.nifi.serialization.record.RecordSchema; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Supplier; +import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA; +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODED_SCHEMA; +import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; import static org.apache.nifi.schema.inference.SchemaInferenceUtil.INFER_SCHEMA; import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE; @@ -70,6 +74,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade private volatile String timestampFormat; private volatile String startingFieldName; private volatile StartingFieldStrategy startingFieldStrategy; + private volatile SchemaApplicationStrategy schemaApplicationStrategy; + public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder() .name("starting-field-strategy") @@ -77,11 +83,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade .description("Start processing from the root node or from a specified nested node.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .defaultValue(StartingFieldStrategy.ROOT_NODE.name()) - .allowableValues( - Arrays.stream(StartingFieldStrategy.values()).map(startingStrategy -> - new AllowableValue(startingStrategy.name(), startingStrategy.getDisplayName(), startingStrategy.getDescription()) - ).toArray(AllowableValue[]::new)) + .defaultValue(StartingFieldStrategy.ROOT_NODE.getValue()) + .allowableValues(StartingFieldStrategy.class) .build(); @@ -95,6 +98,18 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade .dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name()) .build(); + public static final PropertyDescriptor SCHEMA_APPLICATION_STRATEGY = new PropertyDescriptor.Builder() + .name("schema-application-strategy") + .displayName("Schema Application Strategy") + .description("Specifies whether the schema is defined for the whole JSON or for the selected part starting from \"Starting Field Name\".") + .required(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .defaultValue(SchemaApplicationStrategy.SELECTED_PART.getValue()) + .dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name()) + .dependsOn(SCHEMA_ACCESS_STRATEGY, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA) + .allowableValues(SchemaApplicationStrategy.class) + .build(); + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); @@ -104,6 +119,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade .build()); properties.add(STARTING_FIELD_STRATEGY); properties.add(STARTING_FIELD_NAME); + properties.add(SCHEMA_APPLICATION_STRATEGY); properties.add(DateTimeUtils.DATE_FORMAT); properties.add(DateTimeUtils.TIME_FORMAT); properties.add(DateTimeUtils.TIMESTAMP_FORMAT); @@ -117,6 +133,7 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue()); this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue(); + this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue()); } @Override @@ -148,6 +165,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade public RecordReader createRecordReader(final Map variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException { final RecordSchema schema = getSchema(variables, in, null); - return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName); + return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy); } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java index eee487c026..daa518e15c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -39,35 +39,73 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Queue; import java.util.function.Supplier; public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { + private final RecordSchema schema; - public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, - final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { - super(in, logger, dateFormat, timeFormat, timestampFormat); - this.schema = schema; + final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException { + this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null); } public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final String dateFormat, final String timeFormat, final String timestampFormat, - final StartingFieldStrategy strategy, final String startingFieldName) throws IOException, MalformedRecordException { - super(in, logger, dateFormat, timeFormat, timestampFormat, strategy, startingFieldName); - this.schema = schema; + final StartingFieldStrategy startingFieldStrategy, final String startingFieldName, + final SchemaApplicationStrategy schemaApplicationStrategy) + throws IOException, MalformedRecordException { + + super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName); + if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) { + this.schema = getSelectedSchema(schema, startingFieldName); + } else { + this.schema = schema; + } + } + + private RecordSchema getSelectedSchema(final RecordSchema schema, final String startingFieldName) { + final Queue schemas = new LinkedList<>(); + schemas.add(schema); + while (!schemas.isEmpty()) { + final RecordSchema currentSchema = schemas.poll(); + final Optional optionalRecordField = currentSchema.getField(startingFieldName); + if (optionalRecordField.isPresent()) { + return getChildSchemaFromField(optionalRecordField.get()); + } else { + for (RecordField field : currentSchema.getFields()) { + if (field.getDataType() instanceof ArrayDataType || field.getDataType() instanceof RecordDataType) { + schemas.add(getChildSchemaFromField(field)); + } + } + } + + } + throw new RuntimeException(String.format("Selected schema field [%s] not found.", startingFieldName)); + } + + private RecordSchema getChildSchemaFromField(final RecordField recordField) { + if (recordField.getDataType() instanceof ArrayDataType) { + return ((RecordDataType) ((ArrayDataType) recordField.getDataType()).getElementType()).getChildSchema(); + } else if (recordField.getDataType() instanceof RecordDataType) { + return ((RecordDataType) recordField.getDataType()).getChildSchema(); + } else + throw new RuntimeException(String.format("Selected schema field [%s] is not record or array type.", recordField.getFieldName())); } @Override protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields) - throws IOException, MalformedRecordException { + throws IOException, MalformedRecordException { return convertJsonNodeToRecord(jsonNode, schema, coerceTypes, dropUnknownFields, null); } private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknown, final String fieldNamePrefix) - throws IOException, MalformedRecordException { + throws IOException, MalformedRecordException { if (jsonNode == null) { return null; } @@ -90,7 +128,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { } private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix, - final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { + final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException { final Map values = new HashMap<>(schema.getFieldCount() * 2); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/SchemaApplicationStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/SchemaApplicationStrategy.java new file mode 100644 index 0000000000..a6cde20267 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/SchemaApplicationStrategy.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.json; + +import org.apache.nifi.components.DescribedValue; + +public enum SchemaApplicationStrategy implements DescribedValue { + WHOLE_JSON( + "Whole JSON", + "Applies the schema for the whole JSON." + ), + SELECTED_PART( + "Selected Part", + "Applies the schema for the selected part starting from the \"Starting Field Name\"." + ); + + private final String displayName; + private final String description; + + SchemaApplicationStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getValue() { + return name(); + } +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java index 44727e72fd..2ba2f7301b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java @@ -16,9 +16,17 @@ */ package org.apache.nifi.json; -public enum StartingFieldStrategy { - ROOT_NODE("Root Node", "Begins processing from the root node."), - NESTED_FIELD("Nested Field", "Skips forward to the given nested JSON field (array or object) to begin processing."); +import org.apache.nifi.components.DescribedValue; + +public enum StartingFieldStrategy implements DescribedValue { + ROOT_NODE( + "Root Node", + "Begins processing from the root node." + ), + NESTED_FIELD( + "Nested Field", + "Skips forward to the given nested JSON field (array or object) to begin processing." + ); private final String displayName; private final String description; @@ -28,11 +36,18 @@ public enum StartingFieldStrategy { this.description = description; } + @Override public String getDisplayName() { return displayName; } + @Override public String getDescription() { return description; } + + @Override + public String getValue() { + return name(); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html index 9a563d4f65..f46b523758 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html @@ -407,5 +407,12 @@ +

Schema Application Strategies

+ +

+ When using JsonTreeReader with "Nested Field Strategy" and the "Schema Access Strategy" is not "Infer Schema", + it can be configured for the entire original JSON ("Whole JSON" strategy) or for the nested field section ("Selected part" strategy). +

+ diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java index 752d1a6676..06d8acd2cc 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java @@ -93,6 +93,13 @@ class TestJsonTreeRowRecordReader { return new SimpleRecordSchema(accountFields); } + private RecordSchema getSchema() { + final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema()); + final List fields = getDefaultFields(); + fields.add(new RecordField("account", accountType)); + fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType())); + return new SimpleRecordSchema(fields); + } @Test void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException { @@ -1050,7 +1057,7 @@ class TestJsonTreeRowRecordReader { } @Test - void testStartsFromNestedObject() throws IOException, MalformedRecordException { + void testStartFromNestedObject() throws IOException, MalformedRecordException { String jsonPath = "src/test/resources/json/single-element-nested.json"; SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1069,7 +1076,7 @@ class TestJsonTreeRowRecordReader { } @Test - void testStartsFromMultipleNestedField() throws IOException, MalformedRecordException { + void testStartFromMultipleNestedField() throws IOException, MalformedRecordException { String jsonPath = "src/test/resources/json/multiple-nested-field.json"; SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( @@ -1105,7 +1112,8 @@ class TestJsonTreeRowRecordReader { SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields()); List expected = Collections.emptyList(); - testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "notfound"); + testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, + "notfound", SchemaApplicationStrategy.SELECTED_PART); } @Test @@ -1128,29 +1136,119 @@ class TestJsonTreeRowRecordReader { }}) ); - testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "accounts"); + testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, + "accounts", SchemaApplicationStrategy.SELECTED_PART); + } + + @Test + void testStartFromNestedObjectWithWholeJsonSchemaScope() throws IOException, MalformedRecordException { + String jsonPath = "src/test/resources/json/single-element-nested.json"; + + RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) + )); + + RecordSchema recordSchema = new SimpleRecordSchema(Collections.singletonList( + new RecordField("account", RecordFieldType.RECORD.getRecordDataType(accountSchema)) + )); + + RecordSchema expectedRecordSchema = accountSchema; + + List expected = Collections.singletonList( + new MapRecord(expectedRecordSchema, new HashMap() {{ + put("id", 42); + put("balance", 4750.89); + }}) + ); + + testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD, + "account", SchemaApplicationStrategy.WHOLE_JSON); + } + + @Test + void testStartFromNestedArrayWithWholeJsonSchemaScope() throws IOException, MalformedRecordException { + String jsonPath = "src/test/resources/json/single-element-nested-array.json"; + + RecordSchema accountSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("id", RecordFieldType.INT.getDataType()), + new RecordField("balance", RecordFieldType.DOUBLE.getDataType()) + )); + + RecordSchema recordSchema = new SimpleRecordSchema(Collections.singletonList( + new RecordField("accounts", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(accountSchema))) + )); + + RecordSchema expectedRecordSchema = accountSchema; + + List expected = Arrays.asList( + new MapRecord(expectedRecordSchema, new HashMap() {{ + put("id", 42); + put("balance", 4750.89); + }}), + new MapRecord(expectedRecordSchema, new HashMap() {{ + put("id", 43); + put("balance", 48212.38); + }}) + ); + + testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD, + "accounts", SchemaApplicationStrategy.WHOLE_JSON); + } + + @Test + void testStartFromDeepNestedObject() throws IOException, MalformedRecordException { + String jsonPath = "src/test/resources/json/single-element-deep-nested.json"; + + RecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("rootInt", RecordFieldType.INT.getDataType()), + new RecordField("rootString", RecordFieldType.STRING.getDataType()), + new RecordField("nestedLevel1Record", RecordFieldType.RECORD.getRecordDataType( + new SimpleRecordSchema(Arrays.asList( + new RecordField("nestedLevel1Int", RecordFieldType.INT.getDataType()), + new RecordField("nestedLevel1String", RecordFieldType.STRING.getDataType()), + new RecordField("nestedLevel2Record", RecordFieldType.RECORD.getRecordDataType( + new SimpleRecordSchema(Arrays.asList( + new RecordField("nestedLevel2Int", RecordFieldType.INT.getDataType()), + new RecordField("nestedLevel2String", RecordFieldType.STRING.getDataType()) + )) + )) + )) + )) + )); + + SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList( + new RecordField("nestedLevel2Int", RecordFieldType.INT.getDataType()), + new RecordField("nestedLevel2String", RecordFieldType.STRING.getDataType()) + )); + + List expected = Collections.singletonList( + new MapRecord(expectedRecordSchema, new HashMap() {{ + put("nestedLevel2Int", 111); + put("nestedLevel2String", "root.level1.level2:string"); + }}) + ); + + testReadRecords(jsonPath, recordSchema, expected, StartingFieldStrategy.NESTED_FIELD, + "nestedLevel2Record", SchemaApplicationStrategy.WHOLE_JSON); } private void testReadRecords(String jsonPath, List expected) throws IOException, MalformedRecordException { - // GIVEN final File jsonFile = new File(jsonPath); - try ( InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile)) ) { RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null); - - // WHEN - // THEN testReadRecords(jsonStream, schema, expected); } } - private void testReadRecords(String jsonPath, List expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException { + private void testReadRecords(String jsonPath, List expected, StartingFieldStrategy strategy, + String startingFieldName) throws IOException, MalformedRecordException { final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { RecordSchema schema = inferSchema(jsonStream, strategy, startingFieldName); - testReadRecords(jsonStream, schema, expected, strategy, startingFieldName); + testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, SchemaApplicationStrategy.SELECTED_PART); } } @@ -1161,10 +1259,11 @@ class TestJsonTreeRowRecordReader { } } - private void testReadRecords(String jsonPath, RecordSchema schema, List expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException { + private void testReadRecords(String jsonPath, RecordSchema schema, List expected, StartingFieldStrategy strategy, + String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException { final File jsonFile = new File(jsonPath); try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) { - testReadRecords(jsonStream, schema, expected, strategy, startingFieldName); + testReadRecords(jsonStream, schema, expected, strategy, startingFieldName, schemaApplicationStrategy); } } @@ -1195,10 +1294,11 @@ class TestJsonTreeRowRecordReader { } } - private void testReadRecords(InputStream jsonStream, RecordSchema schema, List expected, StartingFieldStrategy strategy, String startingFieldName) + private void testReadRecords(InputStream jsonStream, RecordSchema schema, List expected, StartingFieldStrategy strategy, + String startingFieldName, SchemaApplicationStrategy schemaApplicationStrategy) throws IOException, MalformedRecordException { try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, - strategy, startingFieldName)) { + strategy, startingFieldName, schemaApplicationStrategy)) { List actual = new ArrayList<>(); Record record; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-deep-nested.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-deep-nested.json new file mode 100644 index 0000000000..fbbfd636b6 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-deep-nested.json @@ -0,0 +1,12 @@ +{ + "rootInt": 100, + "rootString": "root_string", + "nestedLevel1Record": { + "nestedLevel1Int": 110, + "nestedLevel1String": "root.level1:string", + "nestedLevel2Record": { + "nestedLevel2Int": 111, + "nestedLevel2String": "root.level1.level2:string" + } + } +} \ No newline at end of file