mirror of https://github.com/apache/nifi.git
NIFI-13468 Add standalone RecordPath function recordOf
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #9018.
This commit is contained in:
parent
a7112a60ec
commit
6ac5a96dd0
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.record.path.functions;
|
||||
|
||||
import org.apache.nifi.record.path.FieldValue;
|
||||
import org.apache.nifi.record.path.RecordPathEvaluationContext;
|
||||
import org.apache.nifi.record.path.StandardFieldValue;
|
||||
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType;
|
||||
|
||||
public class RecordOf extends RecordPathSegment {
|
||||
private final RecordPathSegment[] valuePaths;
|
||||
|
||||
public RecordOf(final RecordPathSegment[] valuePaths, final boolean absolute) {
|
||||
super("recordOf", null, absolute);
|
||||
this.valuePaths = valuePaths;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
|
||||
final List<RecordField> fields = new ArrayList<>();
|
||||
final Map<String, Object> values = new HashMap<>();
|
||||
|
||||
for (int i = 0; i + 1 < valuePaths.length; i += 2) {
|
||||
final String fieldName = valuePaths[i].evaluate(context).findFirst().orElseThrow().toString();
|
||||
final FieldValue fieldValueProvider = valuePaths[i + 1].evaluate(context).findFirst().orElseThrow();
|
||||
|
||||
final Object fieldValue = fieldValueProvider.getValue();
|
||||
|
||||
final RecordField referencedField = fieldValueProvider.getField();
|
||||
final DataType fieldDataType = referencedField != null
|
||||
? referencedField.getDataType() : inferDataType(fieldValue, RecordFieldType.STRING.getDataType());
|
||||
|
||||
fields.add(new RecordField(fieldName, fieldDataType));
|
||||
values.put(fieldName, fieldValue);
|
||||
}
|
||||
|
||||
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||
final Record record = new MapRecord(schema, values);
|
||||
final RecordField recordField = new RecordField("recordOf", RecordFieldType.RECORD.getRecordDataType(schema));
|
||||
|
||||
final FieldValue responseValue = new StandardFieldValue(record, recordField, null);
|
||||
return Stream.of(responseValue);
|
||||
}
|
||||
}
|
|
@ -50,6 +50,7 @@ import org.apache.nifi.record.path.functions.Join;
|
|||
import org.apache.nifi.record.path.functions.MapOf;
|
||||
import org.apache.nifi.record.path.functions.PadLeft;
|
||||
import org.apache.nifi.record.path.functions.PadRight;
|
||||
import org.apache.nifi.record.path.functions.RecordOf;
|
||||
import org.apache.nifi.record.path.functions.Replace;
|
||||
import org.apache.nifi.record.path.functions.ReplaceNull;
|
||||
import org.apache.nifi.record.path.functions.ReplaceRegex;
|
||||
|
@ -295,6 +296,20 @@ public class RecordPathCompiler {
|
|||
|
||||
return new MapOf(argPaths, absolute);
|
||||
}
|
||||
case "recordOf": {
|
||||
final int numArgs = argumentListTree.getChildCount();
|
||||
|
||||
if (numArgs % 2 != 0) {
|
||||
throw new RecordPathException("The recordOf function requires an even number of arguments");
|
||||
}
|
||||
|
||||
final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
|
||||
for (int i = 0; i < numArgs; i++) {
|
||||
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
|
||||
}
|
||||
|
||||
return new RecordOf(argPaths, absolute);
|
||||
}
|
||||
case "toLowerCase": {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||
return new ToLowerCase(args[0], absolute);
|
||||
|
|
|
@ -1417,6 +1417,144 @@ public class TestRecordPath {
|
|||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
class RecordOf {
|
||||
@Test
|
||||
public void createsRecordFromReferencedFields() {
|
||||
assertRecordOf(
|
||||
"recordOf('mappedLong', /id, 'mappedString', /name)",
|
||||
Map.of("id", "mappedLong", "name", "mappedString")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void throwsRecordPathExceptionWhenPassedAnOddAmountOfArguments() {
|
||||
assertThrows(RecordPathException.class, () -> RecordPath.compile("recordOf('firstName', /firstName, 'lastName')").evaluate(record));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportsReferencesToFieldsOfTypeMap() {
|
||||
assertRecordOf(
|
||||
"recordOf('mappedMap', /attributes)",
|
||||
Map.of("attributes", "mappedMap")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportsReferencesToFieldsOfTypeArray() {
|
||||
assertRecordOf(
|
||||
"recordOf('mappedArray', /bytes)",
|
||||
Map.of("bytes", "mappedArray")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportsReferencesToFieldsOfTypeRecord() {
|
||||
assertRecordOf(
|
||||
"recordOf('mappedRecord', /mainAccount)",
|
||||
Map.of("mainAccount", "mappedRecord")
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportsPathReferenceToMissingValue() {
|
||||
final Map<String, DataType> expectedFieldTypes = Map.of(
|
||||
"missingValue", record.getSchema().getDataType("missing").orElseThrow(),
|
||||
"nonExisting", choiceTypeOf(RecordFieldType.STRING, RecordFieldType.RECORD) // fallback used when field is not defined in source
|
||||
);
|
||||
final Map<String, Object> expectedFieldValues = new HashMap<>();
|
||||
expectedFieldValues.put("nonExisting", null);
|
||||
|
||||
assertRecordOf(
|
||||
"recordOf('missingValue', /missing, 'nonExisting', /nonExistingField)",
|
||||
expectedFieldTypes,
|
||||
expectedFieldValues
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportsCreatingRecordWithFieldNameFromPathReference() {
|
||||
final Map<String, DataType> expectedFieldTypes = Map.of(
|
||||
"John", RecordFieldType.STRING.getDataType()
|
||||
);
|
||||
final Map<String, Object> expectedFieldValues = Map.of(
|
||||
"John", "Doe"
|
||||
);
|
||||
|
||||
assertRecordOf(
|
||||
"recordOf(/firstName, /lastName)",
|
||||
expectedFieldTypes,
|
||||
expectedFieldValues
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void supportsCreatingRecordFromLiteralValue() {
|
||||
final Map<String, DataType> expectedFieldTypes = Map.of(
|
||||
"aNumber", RecordFieldType.INT.getDataType(),
|
||||
"aString", RecordFieldType.STRING.getDataType()
|
||||
);
|
||||
final Map<String, Object> expectedFieldValues = Map.of(
|
||||
"aNumber", 2012,
|
||||
"aString", "aValue"
|
||||
);
|
||||
|
||||
assertRecordOf(
|
||||
"recordOf('aNumber', 2012, 'aString', 'aValue')",
|
||||
expectedFieldTypes,
|
||||
expectedFieldValues
|
||||
);
|
||||
}
|
||||
|
||||
private void assertRecordOf(final String path, final Map<String, String> originalToMappedFieldNames) {
|
||||
final Map<String, DataType> expectedFieldTypes = originalToMappedFieldNames.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
Map.Entry::getValue,
|
||||
originalToMappedFieldName -> {
|
||||
final String originalFieldName = originalToMappedFieldName.getKey();
|
||||
return record.getSchema().getDataType(originalFieldName).orElseThrow();
|
||||
}
|
||||
));
|
||||
final Map<String, Object> expectedFieldValues = originalToMappedFieldNames.entrySet().stream()
|
||||
.collect(Collectors.toMap(
|
||||
Map.Entry::getValue,
|
||||
originalToMappedFieldName -> {
|
||||
final String originalFieldName = originalToMappedFieldName.getKey();
|
||||
return record.getValue(originalFieldName);
|
||||
}
|
||||
));
|
||||
|
||||
assertRecordOf(path, expectedFieldTypes, expectedFieldValues);
|
||||
}
|
||||
|
||||
private void assertRecordOf(
|
||||
final String path,
|
||||
final Map<String, DataType> expectedFieldTypes,
|
||||
final Map<String, Object> expectedFieldValues
|
||||
) {
|
||||
final FieldValue result = evaluateSingleFieldValue(path, record);
|
||||
|
||||
assertEquals(RecordFieldType.RECORD, result.getField().getDataType().getFieldType());
|
||||
|
||||
final Object fieldValue = result.getValue();
|
||||
assertInstanceOf(Record.class, fieldValue);
|
||||
final Record recordValue = (Record) fieldValue;
|
||||
|
||||
assertAll(Stream.concat(
|
||||
expectedFieldTypes.entrySet().stream().map(expectation -> () -> {
|
||||
final DataType expectedFieldType = expectation.getValue();
|
||||
final RecordField actualRecordField =
|
||||
recordValue.getSchema().getField(expectation.getKey()).orElseThrow();
|
||||
|
||||
assertEquals(expectedFieldType, actualRecordField.getDataType());
|
||||
}),
|
||||
expectedFieldValues.entrySet().stream().map(expectation ->
|
||||
() -> assertEquals(expectation.getValue(), recordValue.getValue(expectation.getKey()))
|
||||
)
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
class Replace {
|
||||
@Test
|
||||
|
@ -2911,7 +3049,7 @@ public class TestRecordPath {
|
|||
entry("firstName", "John"),
|
||||
entry("lastName", "Doe"),
|
||||
entry("name", "John Doe"),
|
||||
// field "missing" is missing purposel)y
|
||||
// field "missing" is missing purposely
|
||||
entry("date", "2017-10-20T11:00:00Z"),
|
||||
entry("attributes", new HashMap<>(Map.of(
|
||||
"city", "New York",
|
||||
|
|
|
@ -1219,6 +1219,46 @@ And that would give us something like:
|
|||
|
||||
This function requires an even number of arguments and the record paths must represent simple field values.
|
||||
|
||||
=== recordOf
|
||||
|
||||
Creates a nested record with the given parameters. For example, if we have the following record:
|
||||
|
||||
----
|
||||
{
|
||||
"firstName": "Alice",
|
||||
"lastName": "Koopa",
|
||||
"age": 30,
|
||||
"hobbies": ["reading", "hiking", "coding"],
|
||||
"address": {
|
||||
"street": "123 Main St",
|
||||
"city": "Anytown",
|
||||
"state": "CA"
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
We could use the `UpdateRecord` processor with
|
||||
|
||||
----
|
||||
/profile => recordOf("name", /firstName, "location", /address/city, "hobbies", /hobbies, /age, "years old")
|
||||
----
|
||||
|
||||
And that would give us something like:
|
||||
|
||||
----
|
||||
{
|
||||
"name": "Alice",
|
||||
"hobbies": ["reading", "hiking", "coding"],
|
||||
"location": "Anytown",
|
||||
"30": "years old"
|
||||
}
|
||||
----
|
||||
|
||||
This function requires an even number of arguments.
|
||||
Each pair of arguments resembles a field in the new record.
|
||||
Every odd argument, the first one of each pair, is used as field name and coerced into a String value.
|
||||
Every even argument, the second one of each pair, is used as field value.
|
||||
|
||||
[[filter_functions]]
|
||||
== Filter Functions
|
||||
|
||||
|
|
Loading…
Reference in New Issue