NIFI-12773: Added join and anchored RecordPath function

Signed-off-by: Chris Sampson <chris.sampson82@gmail.com>

This closes #8391
This commit is contained in:
Mark Payne 2024-02-09 16:58:09 -05:00 committed by Chris Sampson
parent 01ca19eccc
commit 74bd798097
No known key found for this signature in database
GPG Key ID: 546AEB0826587237
6 changed files with 320 additions and 0 deletions

View File

@ -103,6 +103,10 @@
<artifactId>nifi-uuid5</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.Record;
import java.util.Arrays;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class Anchored extends RecordPathSegment {
private final RecordPathSegment anchorPath;
private final RecordPathSegment evaluationPath;
public Anchored(final RecordPathSegment anchorPath, final RecordPathSegment evaluationPath, final boolean absolute) {
super("anchored", null, absolute);
this.anchorPath = anchorPath;
this.evaluationPath = evaluationPath;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> anchoredStream = anchorPath.evaluate(context);
return anchoredStream.flatMap(fv -> {
final Object value = fv.getValue();
return evaluateFieldValue(value);
});
}
private Stream<FieldValue> evaluateFieldValue(final Object value) {
if (value == null) {
return Stream.of();
}
if (value instanceof Record) {
return evaluateAtRoot((Record) value);
}
if (value instanceof final Record[] array) {
return Arrays.stream(array).flatMap(this::evaluateAtRoot);
}
if (value instanceof final Iterable<?> iterable) {
return StreamSupport.stream(iterable.spliterator(), false).flatMap(element -> {
if (!(element instanceof Record)) {
return Stream.of();
}
return evaluateAtRoot((Record) element);
});
}
return Stream.of();
}
private Stream<FieldValue> evaluateAtRoot(final Record root) {
final RecordPathEvaluationContext recordPathEvaluateContext = new StandardRecordPathEvaluationContext(root);
return evaluationPath.evaluate(recordPathEvaluateContext);
}
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
public class Join extends RecordPathSegment {
private final RecordPathSegment delimiterPath;
private final RecordPathSegment[] valuePaths;
public Join(final RecordPathSegment delimiterPath, final RecordPathSegment[] valuePaths, final boolean absolute) {
super("join", null, absolute);
this.delimiterPath = delimiterPath;
this.valuePaths = valuePaths;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
String delimiter = RecordPathUtils.getFirstStringValue(delimiterPath, context);
if (delimiter == null) {
delimiter = "";
}
final List<String> values = new ArrayList<>();
for (final RecordPathSegment valuePath : valuePaths) {
final Stream<FieldValue> stream = valuePath.evaluate(context);
stream.forEach(fv -> {
final Object value = fv.getValue();
addStringValue(value, values);
});
}
final String joined = String.join(delimiter, values);
final RecordField field = new RecordField("join", RecordFieldType.STRING.getDataType());
final FieldValue responseValue = new StandardFieldValue(joined, field, null);
return Stream.of(responseValue);
}
private void addStringValue(final Object value, final List<String> values) {
if (value == null) {
values.add("null");
return;
}
if (value instanceof final Object[] array) {
for (final Object element : array) {
addStringValue(element, values);
}
} else if (value instanceof final Iterable<?> iterable) {
for (final Object element : iterable) {
addStringValue(element, values);
}
} else {
values.add(DataTypeUtils.toString(value, null));
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.nifi.record.path.filter.NotEqualsFilter;
import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Anchored;
import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Coalesce;
@ -45,6 +46,7 @@ import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.FilterFunction;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.Hash;
import org.apache.nifi.record.path.functions.Join;
import org.apache.nifi.record.path.functions.MapOf;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
@ -129,6 +131,10 @@ public class RecordPathCompiler {
}
case CHILD_REFERENCE: {
final Tree childTree = tree.getChild(0);
if (childTree == null) {
return new RootPath();
}
final int childTreeType = childTree.getType();
if (childTreeType == FIELD_NAME) {
final String childName = childTree.getChild(0).getText();
@ -404,6 +410,24 @@ public class RecordPathCompiler {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new Count(args[0], absolute);
}
case "join": {
final int numArgs = argumentListTree.getChildCount();
if (numArgs < 2) {
throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 2 or more arguments but got " + numArgs);
}
final RecordPathSegment[] joinPaths = new RecordPathSegment[numArgs - 1];
for (int i = 0; i < numArgs - 1; i++) {
joinPaths[i] = buildPath(argumentListTree.getChild(i + 1), null, absolute);
}
final RecordPathSegment delimiterPath = buildPath(argumentListTree.getChild(0), null, absolute);
return new Join(delimiterPath, joinPaths, absolute);
}
case "anchored": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Anchored(args[0], args[1], absolute);
}
case "not":
case "contains":
case "containsRegex":

View File

@ -1241,6 +1241,89 @@ public class TestRecordPath {
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testJoinWithTwoFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("fullName", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("firstName", RecordFieldType.LONG.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("lastName", "Doe");
values.put("firstName", "John");
final Record record = new MapRecord(schema, values);
assertEquals("Doe, John", RecordPath.compile("join(', ', /lastName, /firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testJoinWithArray() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("names", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("names", new String[] {"John", "Jane", "Jacob", "Judy"});
final Record record = new MapRecord(schema, values);
assertEquals("John,Jane,Jacob,Judy", RecordPath.compile("join(',', /names)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testJoinWithArrayAndMultipleFields() {
final List<RecordField> personFields = new ArrayList<>();
personFields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("friends", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
final Map<String, Object> values = new HashMap<>();
values.put("friends", new String[] {"John", "Jane", "Jacob", "Judy"});
values.put("firstName", "John");
values.put("lastName", "Doe");
final Record record = new MapRecord(personSchema, values);
assertEquals("Doe\nJohn\nJane\nJacob", RecordPath.compile("join('\\n', /lastName, /firstName, /friends[1..2])").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testAnchored() {
final List<RecordField> personFields = new ArrayList<>();
personFields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
personFields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType()));
final RecordSchema personSchema = new SimpleRecordSchema(personFields);
final List<RecordField> employeeFields = new ArrayList<>();
employeeFields.add(new RecordField("self", RecordFieldType.RECORD.getRecordDataType(personSchema)));
employeeFields.add(new RecordField("manager", RecordFieldType.RECORD.getRecordDataType(personSchema)));
employeeFields.add(new RecordField("directReports", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(personSchema))));
final RecordSchema employeeSchema = new SimpleRecordSchema(employeeFields);
final Record directReport1 = createPerson("John", "Doe", personSchema);
final Record directReport2 = createPerson("John", "Jingleheimer", personSchema);
final Record directReport3 = createPerson("John", "Jacob", personSchema);
final Record manager = createPerson("Jane", "Smith", personSchema);
final Record employee = new MapRecord(employeeSchema, Map.of(
"self", createPerson("John", "Schmidt", personSchema),
"manager", manager,
"directReports", new Record[] {directReport1, directReport2, directReport3}
));
assertEquals("John", RecordPath.compile("anchored(/directReports[0], /firstName)").evaluate(employee).getSelectedFields().findFirst().get().getValue());
assertEquals(List.of("John", "John", "John"), RecordPath.compile("anchored(/directReports, /firstName)").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
assertEquals(List.of(), RecordPath.compile("anchored(/self/lastName, / )").evaluate(employee).getSelectedFields().map(FieldValue::getValue).toList());
}
private Record createPerson(final String firstName, final String lastName, final RecordSchema schema) {
final Map<String, Object> values = Map.of(
"firstName", firstName,
"lastName", lastName);
return new MapRecord(schema, values);
}
@Test
public void testMapOf() {
final List<RecordField> fields = new ArrayList<>();

View File

@ -456,6 +456,48 @@ Concatenates all the arguments together.
|==========================================================
=== join
Joins together multiple values with a separator.
|==========================================================
| RecordPath | Return value
| `join(', ', /workAddress/* )` | 123, 5th Avenue, New York, NY, 10020
|==========================================================
=== anchored
Allows evaluating a RecordPath while anchoring the root context to a child record.
|==========================================================
| RecordPath | Return value
| `anchored(/homeAddress, /city)` | Jersey City
|==========================================================
Additionally, this can be used in conjunction with arrays. For example, if we have the following record:
----
{
"id": "1234",
"elements": [{
"name": "book",
"color": "red"
}, {
"name": "computer",
"color": "black"
}]
}
----
We can evaluate hte following Record paths:
|==========================================================
| RecordPath | Return value
| `anchored(/elements, /name)` | The array containing `book` and `computer`
| `anchored(/elements, concat(/name, ': ', /color))` | The array containing `book: red` and `computer: black`
|==========================================================
=== fieldName
Normally, when a path is given to a particular field in a Record, what is returned is the value of that field. It