mirror of https://github.com/apache/nifi.git
NIFI-3952: Updated UpdateRecord to pass field-related variables to the Expression Language
This closes #1836. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
1a3c525dd6
commit
6d16fdf170
|
@ -457,6 +457,7 @@
|
||||||
<exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude>
|
<exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude>
|
||||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude>
|
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude>
|
||||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude>
|
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude>
|
||||||
|
<exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude>
|
||||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
|
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
|
||||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
|
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
|
||||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
|
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
|
||||||
|
|
|
@ -20,21 +20,24 @@ package org.apache.nifi.processors.standard;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||||
|
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.SeeAlso;
|
import org.apache.nifi.annotation.documentation.SeeAlso;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||||
import org.apache.nifi.components.AllowableValue;
|
import org.apache.nifi.components.AllowableValue;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.PropertyValue;
|
||||||
import org.apache.nifi.components.ValidationContext;
|
import org.apache.nifi.components.ValidationContext;
|
||||||
import org.apache.nifi.components.ValidationResult;
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
|
@ -47,6 +50,7 @@ import org.apache.nifi.record.path.util.RecordPathCache;
|
||||||
import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
|
import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator;
|
||||||
import org.apache.nifi.serialization.record.Record;
|
import org.apache.nifi.serialization.record.Record;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||||
|
|
||||||
|
|
||||||
@EventDriven
|
@EventDriven
|
||||||
|
@ -60,12 +64,16 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
+ "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.")
|
+ "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.")
|
||||||
@SeeAlso({ConvertRecord.class})
|
@SeeAlso({ConvertRecord.class})
|
||||||
public class UpdateRecord extends AbstractRecordProcessor {
|
public class UpdateRecord extends AbstractRecordProcessor {
|
||||||
|
private static final String FIELD_NAME = "field.name";
|
||||||
|
private static final String FIELD_VALUE = "field.value";
|
||||||
|
private static final String FIELD_TYPE = "field.type";
|
||||||
|
|
||||||
private volatile RecordPathCache recordPathCache;
|
private volatile RecordPathCache recordPathCache;
|
||||||
private volatile List<String> recordPaths;
|
private volatile List<String> recordPaths;
|
||||||
|
|
||||||
static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value",
|
static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value",
|
||||||
"The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with.");
|
"The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with. Expression Language "
|
||||||
|
+ "may reference variables 'field.name', 'field.type', and 'field.value' to access information about the field and the value of the field being evaluated.");
|
||||||
static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value",
|
static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value",
|
||||||
"The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path "
|
"The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path "
|
||||||
+ "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, "
|
+ "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, "
|
||||||
|
@ -142,8 +150,8 @@ public class UpdateRecord extends AbstractRecordProcessor {
|
||||||
final RecordPath recordPath = recordPathCache.getCompiled(recordPathText);
|
final RecordPath recordPath = recordPathCache.getCompiled(recordPathText);
|
||||||
final RecordPathResult result = recordPath.evaluate(record);
|
final RecordPathResult result = recordPath.evaluate(record);
|
||||||
|
|
||||||
final String replacementValue = context.getProperty(recordPathText).evaluateAttributeExpressions(flowFile).getValue();
|
|
||||||
if (evaluateValueAsRecordPath) {
|
if (evaluateValueAsRecordPath) {
|
||||||
|
final String replacementValue = context.getProperty(recordPathText).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
final RecordPath replacementRecordPath = recordPathCache.getCompiled(replacementValue);
|
final RecordPath replacementRecordPath = recordPathCache.getCompiled(replacementValue);
|
||||||
|
|
||||||
// If we have an Absolute RecordPath, we need to evaluate the RecordPath only once against the Record.
|
// If we have an Absolute RecordPath, we need to evaluate the RecordPath only once against the Record.
|
||||||
|
@ -154,7 +162,18 @@ public class UpdateRecord extends AbstractRecordProcessor {
|
||||||
processRelativePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue);
|
processRelativePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue));
|
final PropertyValue replacementValue = context.getProperty(recordPathText);
|
||||||
|
final Map<String, String> fieldVariables = new HashMap<>(4);
|
||||||
|
|
||||||
|
result.getSelectedFields().forEach(fieldVal -> {
|
||||||
|
fieldVariables.clear();
|
||||||
|
fieldVariables.put(FIELD_NAME, fieldVal.getField().getFieldName());
|
||||||
|
fieldVariables.put(FIELD_VALUE, DataTypeUtils.toString(fieldVal.getValue(), (String) null));
|
||||||
|
fieldVariables.put(FIELD_TYPE, fieldVal.getField().getDataType().getFieldType().name());
|
||||||
|
|
||||||
|
final String evaluatedReplacementVal = replacementValue.evaluateAttributeExpressions(flowFile, fieldVariables).getValue();
|
||||||
|
fieldVal.updateValue(evaluatedReplacementVal);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -328,5 +328,63 @@
|
||||||
these simple type coercions for us.
|
these simple type coercions for us.
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
<h3>Example 5 - Use Expression Language to Modify Value</h3>
|
||||||
|
|
||||||
|
<p>
|
||||||
|
This example will capitalize the value of all 'name' fields, regardless of
|
||||||
|
where in the Record hierarchy the field is found. This is done by referencing the 'field.value' variable in the Expression Language.
|
||||||
|
We can also access the field.name variable and the field.type variable.
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<table>
|
||||||
|
<tr>
|
||||||
|
<th>Property Name</th>
|
||||||
|
<th>Property Value</th>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Replacement Value Strategy</td>
|
||||||
|
<td>Literal Value</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>//name</td>
|
||||||
|
<td>${field.value:toUpper()}</td>
|
||||||
|
</tr>
|
||||||
|
</table>
|
||||||
|
|
||||||
|
<p>
|
||||||
|
This will yield the following output:
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<code>
|
||||||
|
<pre>
|
||||||
|
[{
|
||||||
|
"id": 17,
|
||||||
|
"name": "JOHN",
|
||||||
|
"child": {
|
||||||
|
"id": "1"
|
||||||
|
},
|
||||||
|
"siblingIds": [4, 8],
|
||||||
|
"siblings": [
|
||||||
|
{ "name": "JEREMY", "id": 4 },
|
||||||
|
{ "name": "JULIA", "id": 8 }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": 98,
|
||||||
|
"name": "JANE",
|
||||||
|
"child": {
|
||||||
|
"id": 2
|
||||||
|
},
|
||||||
|
"gender": "F",
|
||||||
|
"siblingIds": [],
|
||||||
|
"siblings": []
|
||||||
|
}]
|
||||||
|
</pre>
|
||||||
|
</code>
|
||||||
|
|
||||||
|
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
|
@ -214,4 +214,34 @@ public class TestUpdateRecord {
|
||||||
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json")));
|
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json")));
|
||||||
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFieldValuesInEL() throws InitializationException, IOException {
|
||||||
|
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||||
|
runner.addControllerService("reader", jsonReader);
|
||||||
|
|
||||||
|
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
|
||||||
|
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
|
||||||
|
|
||||||
|
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||||
|
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
|
||||||
|
runner.enableControllerService(jsonReader);
|
||||||
|
|
||||||
|
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
|
||||||
|
runner.addControllerService("writer", jsonWriter);
|
||||||
|
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||||
|
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
||||||
|
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
|
||||||
|
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||||
|
runner.enableControllerService(jsonWriter);
|
||||||
|
|
||||||
|
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
|
||||||
|
runner.setProperty("/name/last", "${field.value:toUpper()}");
|
||||||
|
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.LITERAL_VALUES);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
|
||||||
|
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json")));
|
||||||
|
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
[ {
|
||||||
|
"id" : 485,
|
||||||
|
"name" : {
|
||||||
|
"last" : "DOE",
|
||||||
|
"first" : "John"
|
||||||
|
}
|
||||||
|
} ]
|
Loading…
Reference in New Issue