NIFI-4306: Allow root field to be updated to a child field by adding property with name / and value /child, for instance

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2103.
This commit is contained in:
Mark Payne 2017-08-21 11:12:13 -04:00 committed by Pierre Villard
parent 6559604456
commit 9ebf2cfaf1
8 changed files with 257 additions and 15 deletions

View File

@ -501,9 +501,13 @@
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/name-fields-only.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/output/name-and-mother-same.json</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude>
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude>
<!-- This file is copied from https://github.com/jeremyh/jBCrypt
because the binary is compiled for Java 8 and we must support Java 7 -->
<exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>

View File

@ -42,13 +42,15 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
@ -139,7 +141,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
}
@Override
protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) {
protected Record process(Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) {
final boolean evaluateValueAsRecordPath = context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue());
// Incorporate the RecordSchema that we will use for writing records into the Schema that we have
@ -157,9 +159,9 @@ public class UpdateRecord extends AbstractRecordProcessor {
// If we have an Absolute RecordPath, we need to evaluate the RecordPath only once against the Record.
// If the RecordPath is a Relative Path, then we have to evaluate it against each FieldValue.
if (replacementRecordPath.isAbsolute()) {
processAbsolutePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue);
record = processAbsolutePath(replacementRecordPath, result.getSelectedFields(), record);
} else {
processRelativePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue);
record = processRelativePath(replacementRecordPath, result.getSelectedFields(), record);
}
} else {
final PropertyValue replacementValue = context.getProperty(recordPathText);
@ -180,26 +182,65 @@ public class UpdateRecord extends AbstractRecordProcessor {
return record;
}
private void processAbsolutePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record, final String replacementValue) {
private Record processAbsolutePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record) {
final RecordPathResult replacementResult = replacementRecordPath.evaluate(record);
final Object replacementObject = getReplacementObject(replacementResult, replacementValue);
destinationFields.forEach(fieldVal -> fieldVal.updateValue(replacementObject));
final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList());
return updateRecord(destinationFieldValues, selectedFields, record);
}
private void processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record, final String replacementValue) {
destinationFields.forEach(fieldVal -> {
private Record processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, Record record) {
final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList());
for (final FieldValue fieldVal : destinationFieldValues) {
final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal);
final Object replacementObject = getReplacementObject(replacementResult, replacementValue);
final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
final Object replacementObject = getReplacementObject(selectedFields);
fieldVal.updateValue(replacementObject);
});
record = updateRecord(destinationFieldValues, selectedFields, record);
}
return record;
}
private Object getReplacementObject(final RecordPathResult recordPathResult, final String replacementValue) {
final List<FieldValue> selectedFields = recordPathResult.getSelectedFields().collect(Collectors.toList());
private Record updateRecord(final List<FieldValue> destinationFields, final List<FieldValue> selectedFields, final Record record) {
if (destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent()) {
final Object replacement = getReplacementObject(selectedFields);
if (replacement == null) {
return record;
}
if (replacement instanceof Record) {
return (Record) replacement;
}
final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList());
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record mapRecord = new MapRecord(schema, new HashMap<>());
for (final FieldValue selectedField : selectedFields) {
mapRecord.setValue(selectedField.getField().getFieldName(), selectedField.getValue());
}
return mapRecord;
} else {
for (final FieldValue fieldVal : destinationFields) {
fieldVal.updateValue(getReplacementObject(selectedFields));
}
return record;
}
}
private Object getReplacementObject(final List<FieldValue> selectedFields) {
if (selectedFields.size() > 1) {
throw new ProcessException("Cannot update Record because the Replacement Record Path \"" + replacementValue + "\" yielded "
+ selectedFields.size() + " results but this Processor only supports a single result.");
final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList());
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record record = new MapRecord(schema, new HashMap<>());
for (final FieldValue fieldVal : selectedFields) {
record.setValue(fieldVal.getField().getFieldName(), fieldVal.getValue());
}
return record;
}
if (selectedFields.isEmpty()) {

View File

@ -244,4 +244,156 @@ public class TestUpdateRecord {
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
@Test
public void testSetRootPathAbsoluteWithMultipleValues() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.setProperty("/", "/name/*");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
@Test
public void testSetRootPathAbsoluteWithSingleValue() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.setProperty("/", "/name");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
@Test
public void testSetRootPathRelativeWithMultipleValues() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.setProperty("/name/..", "/name/*");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
@Test
public void testSetRootPathRelativeWithSingleValue() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.setProperty("/name/..", "/name");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
@Test
public void testSetAbsolutePathWithAnotherRecord() throws InitializationException, IOException {
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc")));
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc")));
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
runner.enableControllerService(jsonReader);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
runner.enableControllerService(jsonWriter);
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
runner.setProperty("/name", "/mother");
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
runner.run();
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-and-mother-same.json")));
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
}
}

View File

@ -3,5 +3,9 @@
"name": {
"last": "Doe",
"first": "John"
},
"mother": {
"last": "Doe",
"first": "Jane"
}
}

View File

@ -0,0 +1,11 @@
[ {
"id" : 485,
"name" : {
"last" : "Doe",
"first" : "Jane"
},
"mother" : {
"last" : "Doe",
"first" : "Jane"
}
} ]

View File

@ -0,0 +1,9 @@
{
"name": "nameFieldsOnly",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "last", "type": "string" },
{ "name": "first", "type": "string" }
]
}

View File

@ -0,0 +1,17 @@
{
"name": "personWithNameRecord",
"namespace": "nifi",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": {
"type": "record",
"name": "nameRecord",
"fields": [
{ "name": "last", "type": "string" },
{ "name": "first", "type": "string" }
]
}},
{ "name": "mother", "type": "nifi.nameRecord" }
]
}