NIFI-8070: Added coalesce function to RecordPath

This commit is contained in:
Mark Payne 2020-12-03 16:05:26 -05:00 committed by markap14
parent 04aaf25131
commit d84583690f
4 changed files with 243 additions and 46 deletions

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import java.util.Optional;
import java.util.stream.Stream;
public class Coalesce extends RecordPathSegment {
private final RecordPathSegment[] valuePaths;
public Coalesce(final RecordPathSegment[] valuePaths, final boolean absolute) {
super("coalesce", null, absolute);
this.valuePaths = valuePaths;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
for (final RecordPathSegment valuePath : valuePaths) {
final Stream<FieldValue> stream = valuePath.evaluate(context);
final Optional<FieldValue> firstFieldValue = stream.findFirst();
if (firstFieldValue.isPresent()) {
// If the Optional is Present, it means that it found the field, but the value may still be explicitly null.
final FieldValue fieldValue = firstFieldValue.get();
if (fieldValue.getValue() != null) {
return Stream.of(firstFieldValue.get());
}
}
}
return Stream.empty();
}
}

View File

@ -17,6 +17,53 @@
package org.apache.nifi.record.path.paths; package org.apache.nifi.record.path.paths;
import org.antlr.runtime.tree.Tree;
import org.apache.nifi.record.path.NumericRange;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.filter.Contains;
import org.apache.nifi.record.path.filter.ContainsRegex;
import org.apache.nifi.record.path.filter.EndsWith;
import org.apache.nifi.record.path.filter.EqualsFilter;
import org.apache.nifi.record.path.filter.GreaterThanFilter;
import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter;
import org.apache.nifi.record.path.filter.IsBlank;
import org.apache.nifi.record.path.filter.IsEmpty;
import org.apache.nifi.record.path.filter.LessThanFilter;
import org.apache.nifi.record.path.filter.LessThanOrEqualFilter;
import org.apache.nifi.record.path.filter.MatchesRegex;
import org.apache.nifi.record.path.filter.NotEqualsFilter;
import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Coalesce;
import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.Hash;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull;
import org.apache.nifi.record.path.functions.ReplaceRegex;
import org.apache.nifi.record.path.functions.Substring;
import org.apache.nifi.record.path.functions.SubstringAfter;
import org.apache.nifi.record.path.functions.SubstringAfterLast;
import org.apache.nifi.record.path.functions.SubstringBefore;
import org.apache.nifi.record.path.functions.SubstringBeforeLast;
import org.apache.nifi.record.path.functions.ToBytes;
import org.apache.nifi.record.path.functions.ToDate;
import org.apache.nifi.record.path.functions.ToLowerCase;
import org.apache.nifi.record.path.functions.ToString;
import org.apache.nifi.record.path.functions.ToUpperCase;
import org.apache.nifi.record.path.functions.TrimString;
import org.apache.nifi.record.path.functions.UUID5;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import static org.apache.nifi.record.path.RecordPathParser.ARRAY_INDEX; import static org.apache.nifi.record.path.RecordPathParser.ARRAY_INDEX;
import static org.apache.nifi.record.path.RecordPathParser.CHILD_REFERENCE; import static org.apache.nifi.record.path.RecordPathParser.CHILD_REFERENCE;
import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD; import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD;
@ -42,52 +89,6 @@ import static org.apache.nifi.record.path.RecordPathParser.STRING_LIST;
import static org.apache.nifi.record.path.RecordPathParser.STRING_LITERAL; import static org.apache.nifi.record.path.RecordPathParser.STRING_LITERAL;
import static org.apache.nifi.record.path.RecordPathParser.WILDCARD; import static org.apache.nifi.record.path.RecordPathParser.WILDCARD;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;
import org.antlr.runtime.tree.Tree;
import org.apache.nifi.record.path.NumericRange;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.filter.Contains;
import org.apache.nifi.record.path.filter.ContainsRegex;
import org.apache.nifi.record.path.filter.EndsWith;
import org.apache.nifi.record.path.filter.EqualsFilter;
import org.apache.nifi.record.path.filter.GreaterThanFilter;
import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter;
import org.apache.nifi.record.path.filter.IsBlank;
import org.apache.nifi.record.path.filter.IsEmpty;
import org.apache.nifi.record.path.filter.LessThanFilter;
import org.apache.nifi.record.path.filter.LessThanOrEqualFilter;
import org.apache.nifi.record.path.filter.MatchesRegex;
import org.apache.nifi.record.path.filter.NotEqualsFilter;
import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.Hash;
import org.apache.nifi.record.path.functions.PadLeft;
import org.apache.nifi.record.path.functions.PadRight;
import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull;
import org.apache.nifi.record.path.functions.ReplaceRegex;
import org.apache.nifi.record.path.functions.Substring;
import org.apache.nifi.record.path.functions.SubstringAfter;
import org.apache.nifi.record.path.functions.SubstringAfterLast;
import org.apache.nifi.record.path.functions.SubstringBefore;
import org.apache.nifi.record.path.functions.SubstringBeforeLast;
import org.apache.nifi.record.path.functions.ToBytes;
import org.apache.nifi.record.path.functions.ToDate;
import org.apache.nifi.record.path.functions.ToLowerCase;
import org.apache.nifi.record.path.functions.ToString;
import org.apache.nifi.record.path.functions.ToUpperCase;
import org.apache.nifi.record.path.functions.TrimString;
import org.apache.nifi.record.path.functions.UUID5;
public class RecordPathCompiler { public class RecordPathCompiler {
public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) { public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) {
@ -343,6 +344,16 @@ public class RecordPathCompiler {
return new UUID5(args[0], null, absolute); return new UUID5(args[0], null, absolute);
} }
} }
case "coalesce": {
final int numArgs = argumentListTree.getChildCount();
final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
for (int i = 0; i < numArgs; i++) {
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
}
return new Coalesce(argPaths, absolute);
}
default: { default: {
throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only " throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
+ "be used within a predicate, not as a standalone function"); + "be used within a predicate, not as a standalone function");

View File

@ -48,6 +48,7 @@ import java.util.stream.IntStream;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -1215,6 +1216,65 @@ public class TestRecordPath {
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue()); assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
} }
@Test
public void testCoalesce() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", "1234");
values.put("name", null);
Record record = new MapRecord(schema, values);
final RecordPath recordPath = RecordPath.compile("coalesce(/id, /name)");
// Test where the first value is populated
FieldValue fieldValue = recordPath.evaluate(record).getSelectedFields().findFirst().get();
assertEquals("1234", fieldValue.getValue());
assertEquals("id", fieldValue.getField().getFieldName());
// Test different value populated
values.clear();
values.put("id", null);
values.put("name", "John Doe");
record = new MapRecord(schema, values);
fieldValue = recordPath.evaluate(record).getSelectedFields().findFirst().get();
assertEquals("John Doe", fieldValue.getValue());
assertEquals("name", fieldValue.getField().getFieldName());
// Test all null
values.clear();
values.put("id", null);
values.put("name", null);
record = new MapRecord(schema, values);
assertFalse(recordPath.evaluate(record).getSelectedFields().findFirst().isPresent());
// Test none is null
values.clear();
values.put("id", "1234");
values.put("name", "John Doe");
record = new MapRecord(schema, values);
fieldValue = recordPath.evaluate(record).getSelectedFields().findFirst().get();
assertEquals("1234", fieldValue.getValue());
assertEquals("id", fieldValue.getField().getFieldName());
// Test missing field
values.clear();
values.put("name", "John Doe");
record = new MapRecord(schema, values);
fieldValue = recordPath.evaluate(record).getSelectedFields().findFirst().get();
assertEquals("John Doe", fieldValue.getValue());
assertEquals("name", fieldValue.getField().getFieldName());
}
private Record getCaseTestRecord() { private Record getCaseTestRecord() {
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("middleName", RecordFieldType.STRING.getDataType())); fields.add(new RecordField("middleName", RecordFieldType.STRING.getDataType()));

View File

@ -561,6 +561,79 @@ The following record path would convert the String field into a byte array using
`toBytes( /s, "UTF-16")` `toBytes( /s, "UTF-16")`
=== coalesce
Returns the first value from the given arguments that is non-null. For example, given a record such as:
----
{
"id": null,
"name": "John Doe"
}
----
The following record path would return "John Doe":
`coalesce(/id, /name)`
Given the record:
----
{
"id": "1234",
"name": null
}
----
The same record path would return "1234".
Given the record:
----
{
"id": null,
"name": null
}
----
The record path would return `null`.
Given the record:
----
{
"id": "null",
"name": "John Doe"
}
----
The record path would return the String "null". Note here the very important difference in that the `id`
field does not have a null value but rather the value of the field is the literal string "null".
Given the record:
----
{
"name": null
}
----
The record path would return `null`. Given that the `id` field is not present, it is treated as a `null` value.
Given the record:
----
{
"id": "1234",
"name": "John Doe"
}
----
The record path would return "1234". However, the record path `coalesce(/name, /id)` would return "John Doe" because
both fields given are non-null, so the `coalesce` function returns the first value that is referenced in its arguments,
not the first value that is encountered in the Record itself.
=== format === format
Converts a Date to a String in the given format with the given time zone(optional, default time zone is GMT). Converts a Date to a String in the given format with the given time zone(optional, default time zone is GMT).