NIFI-4377: Added a fieldName() function to RecordPath and addressed an issue that caused //* to not work

NIFI-4377: Updated RecordPath Guide to include the new fieldName() function and improved unit test for the function

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2147.
This commit is contained in:
Mark Payne 2017-09-12 10:02:16 -04:00 committed by Pierre Villard
parent 1f1269c817
commit e52e9acc59
5 changed files with 158 additions and 0 deletions

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
public class FieldName extends RecordPathSegment {
private final RecordPathSegment recordPath;
public FieldName(final RecordPathSegment recordPath, final boolean absolute) {
super("fieldName", null, absolute);
this.recordPath = recordPath;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.map(fv -> new StandardFieldValue(fv.getField().getFieldName(), fv.getField(), fv.getParent().orElse(null)));
}
}

View File

@ -65,6 +65,7 @@ import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull;
import org.apache.nifi.record.path.functions.ReplaceRegex;
@ -163,6 +164,8 @@ public class RecordPathCompiler {
if (childTreeType == FIELD_NAME) {
final String descendantName = childTree.getChild(0).getText();
return new DescendantFieldPath(descendantName, parent, absolute);
} else if (childTreeType == WILDCARD) {
return new WildcardDescendantPath(parent, absolute);
} else {
throw new RecordPathException("Expected field name following '//' Token but found " + childTree);
}
@ -237,6 +240,10 @@ public class RecordPathCompiler {
return new Concat(argPaths, absolute);
}
case "fieldName": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new FieldName(args[0], absolute);
}
default: {
throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
+ "be used within a predicate, not as a standalone function");

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
public class WildcardDescendantPath extends RecordPathSegment {
WildcardDescendantPath(final RecordPathSegment parent, final boolean absolute) {
super("/*", parent, absolute);
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> parentResult = getParentPath().evaluate(context);
return parentResult
.flatMap(recordFieldVal -> findDescendants(recordFieldVal).stream());
}
private List<FieldValue> findDescendants(final FieldValue fieldValue) {
if (fieldValue == null || fieldValue.getValue() == null) {
return Collections.emptyList();
}
if (!Filters.isRecord(fieldValue)) {
return Collections.emptyList();
}
final Record record = (Record) fieldValue.getValue();
final List<FieldValue> matchingValues = new ArrayList<>();
for (final RecordField childField : record.getSchema().getFields()) {
final Object value = record.getValue(childField);
if (value == null) {
continue;
}
final FieldValue descendantFieldValue = new StandardFieldValue(value, childField, fieldValue);
matchingValues.add(descendantFieldValue);
if (Filters.isRecord(childField.getDataType(), value)) {
final FieldValue childFieldValue = new StandardFieldValue(value, childField, fieldValue);
matchingValues.addAll(findDescendants(childFieldValue));
}
}
return matchingValues;
}
}

View File

@ -999,6 +999,24 @@ public class TestRecordPath {
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testFieldName() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("name", RecordPath.compile("fieldName(/name)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("name", RecordPath.compile("fieldName(/*)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("//*[startsWith(fieldName(.), 'na')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("name", RecordPath.compile("fieldName(//*[startsWith(fieldName(.), 'na')])").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("//name[not(startsWith(fieldName(.), 'xyz'))]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("//name[not(startsWith(fieldName(.), 'n'))]").evaluate(record).getSelectedFields().count());
}
private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));

View File

@ -459,6 +459,23 @@ Concatenates all the arguments together.
|==========================================================
=== fieldName
Normally, when a path is given to a particular field in a Record, what is returned is the value of that field. It
can sometimes be useful, however, to obtain the name of the field instead of the value. To do this, we can use the
`fieldName` function.
|=====================================================================
| RecordPath | Return value
| `fieldName(//city/..)` | `workAddress` and `homeAddress`
| `//city[not(startsWith(fieldName(..), 'work'))]` | Jersey City
|=====================================================================
In the above example, the first RecordPath returns two separate field names: "workAddress" and "homeAddress". The second
RecordPath, in contrast, returns the value of a "city" field and uses the `fieldName` function as a predicate. The second
RecordPath finds a "city" field whose parent does not have a name that begins with "work". This means that it will return
the value of the "city" field whose parent is "homeAddress" but not the value of the "city" field whose parent is "workAddress".
[[filter_functions]]
== Filter Functions