diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 19f47be8f1..7b60b842bd 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -501,9 +501,13 @@ src/test/resources/TestUpdateRecord/output/person-with-firstname.json src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json + src/test/resources/TestUpdateRecord/output/name-fields-only.json + src/test/resources/TestUpdateRecord/output/name-and-mother-same.json src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc + src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc + src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java index abe29a2838..b2c8002604 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -42,13 +42,15 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.record.path.FieldValue; import org.apache.nifi.record.path.RecordPath; import org.apache.nifi.record.path.RecordPathResult; import org.apache.nifi.record.path.util.RecordPathCache; import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.util.DataTypeUtils; @@ -139,7 +141,7 @@ public class UpdateRecord extends AbstractRecordProcessor { } @Override - protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) { + protected Record process(Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) { final boolean evaluateValueAsRecordPath = context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue()); // Incorporate the RecordSchema that we will use for writing records into the Schema that we have @@ -157,9 +159,9 @@ public class UpdateRecord extends AbstractRecordProcessor { // If we have an Absolute RecordPath, we need to evaluate the RecordPath only once against the Record. // If the RecordPath is a Relative Path, then we have to evaluate it against each FieldValue. if (replacementRecordPath.isAbsolute()) { - processAbsolutePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue); + record = processAbsolutePath(replacementRecordPath, result.getSelectedFields(), record); } else { - processRelativePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue); + record = processRelativePath(replacementRecordPath, result.getSelectedFields(), record); } } else { final PropertyValue replacementValue = context.getProperty(recordPathText); @@ -180,26 +182,65 @@ public class UpdateRecord extends AbstractRecordProcessor { return record; } - private void processAbsolutePath(final RecordPath replacementRecordPath, final Stream destinationFields, final Record record, final String replacementValue) { + private Record processAbsolutePath(final RecordPath replacementRecordPath, final Stream destinationFields, final Record record) { final RecordPathResult replacementResult = replacementRecordPath.evaluate(record); - final Object replacementObject = getReplacementObject(replacementResult, replacementValue); - destinationFields.forEach(fieldVal -> fieldVal.updateValue(replacementObject)); + final List selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList()); + final List destinationFieldValues = destinationFields.collect(Collectors.toList()); + + return updateRecord(destinationFieldValues, selectedFields, record); } - private void processRelativePath(final RecordPath replacementRecordPath, final Stream destinationFields, final Record record, final String replacementValue) { - destinationFields.forEach(fieldVal -> { + private Record processRelativePath(final RecordPath replacementRecordPath, final Stream destinationFields, Record record) { + final List destinationFieldValues = destinationFields.collect(Collectors.toList()); + + for (final FieldValue fieldVal : destinationFieldValues) { final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal); - final Object replacementObject = getReplacementObject(replacementResult, replacementValue); + final List selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList()); + final Object replacementObject = getReplacementObject(selectedFields); fieldVal.updateValue(replacementObject); - }); + + record = updateRecord(destinationFieldValues, selectedFields, record); + } + + return record; } - private Object getReplacementObject(final RecordPathResult recordPathResult, final String replacementValue) { - final List selectedFields = recordPathResult.getSelectedFields().collect(Collectors.toList()); + private Record updateRecord(final List destinationFields, final List selectedFields, final Record record) { + if (destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent()) { + final Object replacement = getReplacementObject(selectedFields); + if (replacement == null) { + return record; + } + if (replacement instanceof Record) { + return (Record) replacement; + } + final List fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList()); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record mapRecord = new MapRecord(schema, new HashMap<>()); + for (final FieldValue selectedField : selectedFields) { + mapRecord.setValue(selectedField.getField().getFieldName(), selectedField.getValue()); + } + + return mapRecord; + } else { + for (final FieldValue fieldVal : destinationFields) { + fieldVal.updateValue(getReplacementObject(selectedFields)); + } + return record; + } + } + + private Object getReplacementObject(final List selectedFields) { if (selectedFields.size() > 1) { - throw new ProcessException("Cannot update Record because the Replacement Record Path \"" + replacementValue + "\" yielded " - + selectedFields.size() + " results but this Processor only supports a single result."); + final List fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList()); + final RecordSchema schema = new SimpleRecordSchema(fields); + final Record record = new MapRecord(schema, new HashMap<>()); + for (final FieldValue fieldVal : selectedFields) { + record.setValue(fieldVal.getField().getFieldName(), fieldVal.getValue()); + } + + return record; } if (selectedFields.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java index 2c1f6ffb99..aa675b8652 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -244,4 +244,156 @@ public class TestUpdateRecord { final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json"))); runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); } + + @Test + public void testSetRootPathAbsoluteWithMultipleValues() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/", "/name/*"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testSetRootPathAbsoluteWithSingleValue() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/", "/name"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + + @Test + public void testSetRootPathRelativeWithMultipleValues() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/name/..", "/name/*"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testSetRootPathRelativeWithSingleValue() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/name/..", "/name"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testSetAbsolutePathWithAnotherRecord() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/name", "/mother"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-and-mother-same.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json index 7538381153..f342ee9727 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json @@ -3,5 +3,9 @@ "name": { "last": "Doe", "first": "John" + }, + "mother": { + "last": "Doe", + "first": "Jane" } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/name-and-mother-same.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/name-and-mother-same.json new file mode 100644 index 0000000000..ed0133a76b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/name-and-mother-same.json @@ -0,0 +1,11 @@ +[ { + "id" : 485, + "name" : { + "last" : "Doe", + "first" : "Jane" + }, + "mother" : { + "last" : "Doe", + "first" : "Jane" + } +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/name-fields-only.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/name-fields-only.json new file mode 100644 index 0000000000..61f0fd795e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/name-fields-only.json @@ -0,0 +1,4 @@ +[ { + "last" : "Doe", + "first" : "John" +} ] \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc new file mode 100644 index 0000000000..94f2223523 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc @@ -0,0 +1,9 @@ +{ + "name": "nameFieldsOnly", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "last", "type": "string" }, + { "name": "first", "type": "string" } + ] +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc new file mode 100644 index 0000000000..27cc0b64ff --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc @@ -0,0 +1,17 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": { + "type": "record", + "name": "nameRecord", + "fields": [ + { "name": "last", "type": "string" }, + { "name": "first", "type": "string" } + ] + }}, + { "name": "mother", "type": "nifi.nameRecord" } + ] +} \ No newline at end of file