diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java index 2f8a766923..ce8708c173 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java @@ -20,7 +20,9 @@ package org.apache.nifi.serialization.record; import org.apache.nifi.serialization.SchemaValidationException; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.ChoiceDataType; import org.apache.nifi.serialization.record.type.MapDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; @@ -434,22 +436,125 @@ public class MapRecord implements Record { this.schema = DataTypeUtils.merge(this.schema, other); } + + + @Override public void incorporateInactiveFields() { - if (inactiveFields == null) { - return; + final List updatedFields = new ArrayList<>(); + + for (final RecordField field : schema.getFields()) { + updatedFields.add(getUpdatedRecordField(field)); } - final List allFields = new ArrayList<>(schema.getFieldCount() + inactiveFields.size()); - allFields.addAll(schema.getFields()); - - for (final RecordField field : inactiveFields) { - if (!allFields.contains(field)) { - allFields.add(field); + if (inactiveFields != null) { + for (final RecordField field : inactiveFields) { + if (!updatedFields.contains(field)) { + updatedFields.add(field); + } } } - this.schema = new SimpleRecordSchema(allFields); + this.schema = new SimpleRecordSchema(updatedFields); + } + + private RecordField getUpdatedRecordField(final RecordField field) { + final DataType dataType = field.getDataType(); + final RecordFieldType fieldType = dataType.getFieldType(); + + if (isSimpleType(fieldType)) { + return field; + } + + final Object value = getValue(field); + if (value == null) { + return field; + } + + if (fieldType == RecordFieldType.RECORD && value instanceof Record) { + final Record childRecord = (Record) value; + childRecord.incorporateInactiveFields(); + + final RecordSchema definedChildSchema = ((RecordDataType) dataType).getChildSchema(); + final RecordSchema actualChildSchema = childRecord.getSchema(); + final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema); + final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema); + + final RecordField updatedField = new RecordField(field.getFieldName(), combinedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable()); + return updatedField; + } + + if (fieldType == RecordFieldType.ARRAY && value instanceof Object[]) { + final DataType elementType = ((ArrayDataType) dataType).getElementType(); + final RecordFieldType elementFieldType = elementType.getFieldType(); + + if (elementFieldType == RecordFieldType.RECORD) { + final Object[] array = (Object[]) value; + RecordSchema mergedSchema = ((RecordDataType) elementType).getChildSchema(); + + for (final Object element : array) { + if (element == null) { + continue; + } + + final Record record = (Record) element; + record.incorporateInactiveFields(); + mergedSchema = DataTypeUtils.merge(mergedSchema, record.getSchema()); + } + + final DataType mergedRecordType = RecordFieldType.RECORD.getRecordDataType(mergedSchema); + final DataType mergedDataType = RecordFieldType.ARRAY.getArrayDataType(mergedRecordType); + final RecordField updatedField = new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable()); + return updatedField; + } + + return field; + } + + if (fieldType == RecordFieldType.CHOICE) { + final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; + final List possibleTypes = choiceDataType.getPossibleSubTypes(); + + final DataType chosenDataType = DataTypeUtils.chooseDataType(value, choiceDataType); + if (chosenDataType.getFieldType() != RecordFieldType.RECORD || !(value instanceof Record)) { + return field; + } + + final RecordDataType recordDataType = (RecordDataType) chosenDataType; + final Record childRecord = (Record) value; + childRecord.incorporateInactiveFields(); + + final RecordSchema definedChildSchema = recordDataType.getChildSchema(); + final RecordSchema actualChildSchema = childRecord.getSchema(); + final RecordSchema combinedChildSchema = DataTypeUtils.merge(definedChildSchema, actualChildSchema); + final DataType combinedDataType = RecordFieldType.RECORD.getRecordDataType(combinedChildSchema); + + final List updatedPossibleTypes = new ArrayList<>(possibleTypes.size()); + for (final DataType possibleType : possibleTypes) { + if (possibleType.equals(chosenDataType)) { + updatedPossibleTypes.add(combinedDataType); + } else { + updatedPossibleTypes.add(possibleType); + } + } + + final DataType mergedDataType = RecordFieldType.CHOICE.getChoiceDataType(updatedPossibleTypes); + return new RecordField(field.getFieldName(), mergedDataType, field.getDefaultValue(), field.getAliases(), field.isNullable()); + } + + return field; + } + + private boolean isSimpleType(final RecordFieldType fieldType) { + switch (fieldType) { + case ARRAY: + case RECORD: + case MAP: + case CHOICE: + return false; + } + + return true; } @Override diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index ed51c40805..5a3cee4492 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -1,13 +1,13 @@ - 4.0.0 @@ -554,6 +554,8 @@ src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-streets.json + src/test/resources/TestUpdateRecord/input/addresses.json + src/test/resources/TestUpdateRecord/output/full-addresses.json src/test/resources/xxe_template.xml src/test/resources/xxe_from_report.xml src/test/resources/TestForkRecord/single-element-nested-array-strings.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index 20e5dd100d..5af5262968 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -203,8 +203,6 @@ public class UpdateRecord extends AbstractRecordProcessor { final List selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList()); final Object replacementObject = getReplacementObject(selectedFields); updateFieldValue(fieldVal, replacementObject); - - record = updateRecord(destinationFieldValues, selectedFields, record); } return record; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java index e732695bc1..daf57a86b7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -17,19 +17,12 @@ package org.apache.nifi.processors.standard; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Collections; - import org.apache.commons.lang3.StringUtils; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.RecordFieldType; @@ -39,6 +32,14 @@ import org.apache.nifi.util.TestRunners; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestUpdateRecord { private TestRunner runner; @@ -158,6 +159,31 @@ public class TestUpdateRecord { mff.assertContentEquals("header\nJohnny,Johnny\n"); } + @Test + public void testConcatWithArrayInferredSchema() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/addresses.json")); + runner.setProperty("/addresses[*]/full", "concat(../street, ' ', ../city, ' ', ../state)"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/full-addresses.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + @Test public void testChangingSchema() throws InitializationException, IOException { final JsonTreeReader jsonReader = new JsonTreeReader(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/addresses.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/addresses.json new file mode 100644 index 0000000000..3e0abb0d8e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/addresses.json @@ -0,0 +1,11 @@ +[ { + "addresses" : [ { + "street" : "1234 My Street", + "city" : "My City", + "state" : "MS" + }, { + "street" : "4321 Your Street", + "city" : "Your City", + "state" : "YS" + } ] +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/full-addresses.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/full-addresses.json new file mode 100644 index 0000000000..bf45236f26 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/full-addresses.json @@ -0,0 +1,13 @@ +[ { + "addresses" : [ { + "street" : "1234 My Street", + "city" : "My City", + "state" : "MS", + "full" : "1234 My Street My City MS" + }, { + "street" : "4321 Your Street", + "city" : "Your City", + "state" : "YS", + "full" : "4321 Your Street Your City YS" + } ] +} ] \ No newline at end of file