NIFI-4009: Added support for several key functions in RecordPath

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #1881
This commit is contained in:
Mark Payne 2017-05-31 12:17:28 -04:00 committed by Matt Burgess
parent 0bddcfe730
commit 32314d70fd
33 changed files with 1912 additions and 58 deletions

View File

@ -69,6 +69,8 @@ CHILD_SEPARATOR : '/';
DESCENDANT_SEPARATOR : '//'; DESCENDANT_SEPARATOR : '//';
LBRACKET : '['; LBRACKET : '[';
RBRACKET : ']'; RBRACKET : ']';
LPAREN : '(';
RPAREN : ')';
NUMBER : '-'? ('0'..'9')+; NUMBER : '-'? ('0'..'9')+;
QUOTE : '\''; QUOTE : '\'';
COMMA : ','; COMMA : ',';
@ -92,9 +94,20 @@ WHITESPACE : SPACE+ { skip(); };
fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C'; fragment SPACE : ' ' | '\t' | '\n' | '\r' | '\u000C';
RAW_FIELD_NAME : ( // filter functions
~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>') CONTAINS : 'contains';
~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ')* CONTAINS_REGEX : 'containsRegex';
ENDS_WITH : 'endsWith';
STARTS_WITH : 'startsWith';
IS_BLANK : 'isBlank';
IS_EMPTY : 'isEmpty';
MATCHES_REGEX : 'matchesRegex';
NOT : 'not';
IDENTIFIER : (
~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '0'..'9' | ' ' | '.' | '-' | '=' | '?' | '<' | '>' | '(' | ')' )
~('/' | '[' | ']' | '*' | '"' | '\'' | ',' | '\t' | '\r' | '\n' | '=' | '?' | '<' | '>' | ' ' | '(' | ')' )*
); );
// STRINGS // STRINGS

View File

@ -38,6 +38,8 @@ tokens {
PREDICATE; PREDICATE;
OPERATOR; OPERATOR;
RELATIVE_PATH; RELATIVE_PATH;
FUNCTION;
ARGUMENTS;
} }
@header { @header {
@ -90,7 +92,7 @@ multipleStringLiterals : STRING_LITERAL (COMMA! STRING_LITERAL)*;
stringList : multipleStringLiterals -> stringList : multipleStringLiterals ->
^(STRING_LIST multipleStringLiterals); ^(STRING_LIST multipleStringLiterals);
rawOrLiteral : RAW_FIELD_NAME | STRING_LITERAL; rawOrLiteral : IDENTIFIER | STRING_LITERAL;
@ -118,6 +120,7 @@ index : LBRACKET! indexOrKey RBRACKET!;
// //
// Predicates // Predicates
// //
@ -125,13 +128,52 @@ operator : LESS_THAN | LESS_THAN_EQUAL | GREATER_THAN | GREATER_THAN_EQUAL | EQU
literal : NUMBER | STRING_LITERAL; literal : NUMBER | STRING_LITERAL;
expression : path | literal; expression : path | literal | function;
operation : relativePath operator^ expression; operation : expression operator^ expression;
predicate : LBRACKET operation RBRACKET -> filter : filterFunction | operation;
^(PREDICATE operation);
predicate : LBRACKET filter RBRACKET ->
^(PREDICATE filter);
//
// Functions
//
argument : expression;
optionalArgument : argument?;
argumentList : optionalArgument (COMMA argument)* ->
^(ARGUMENTS optionalArgument argument*);
function : IDENTIFIER LPAREN argumentList RPAREN ->
^(FUNCTION IDENTIFIER argumentList);
filterFunctionNames : CONTAINS | CONTAINS_REGEX | ENDS_WITH | STARTS_WITH | IS_BLANK | IS_EMPTY | MATCHES_REGEX;
filterArgument : expression | filterFunction;
optionalFilterArgument : filterArgument?;
filterArgumentList : optionalFilterArgument (COMMA filterArgument)* ->
^(ARGUMENTS optionalFilterArgument filterArgument*);
simpleFilterFunction : filterFunctionNames LPAREN filterArgumentList RPAREN ->
^(FUNCTION filterFunctionNames filterArgumentList);
simpleFilterFunctionOrOperation : simpleFilterFunction | operation;
notFunctionArgList : simpleFilterFunctionOrOperation ->
^(ARGUMENTS simpleFilterFunctionOrOperation);
notFilterFunction : NOT LPAREN notFunctionArgList RPAREN ->
^(FUNCTION NOT notFunctionArgList);
filterFunction : simpleFilterFunction | notFilterFunction;
@ -191,5 +233,7 @@ relativePath : currentOrParent relativePathSegment? ->
path : absolutePath | relativePath; path : absolutePath | relativePath;
pathExpression : path EOF -> pathOrFunction : path | function;
^(PATH_EXPRESSION path);
pathExpression : pathOrFunction EOF ->
^(PATH_EXPRESSION pathOrFunction);

View File

@ -54,10 +54,11 @@ public interface RecordPath {
* against a Record via {@link #evaluate(Record)} and then have a Relative RecordPath evaluated against * against a Record via {@link #evaluate(Record)} and then have a Relative RecordPath evaluated against
* the results. This method will throw an Exception if this RecordPath is an Absolute RecordPath. * the results. This method will throw an Exception if this RecordPath is an Absolute RecordPath.
* *
* @param record the Record to evaluate
* @param contextNode the context node that represents where in the Record the 'current node' or 'context node' is * @param contextNode the context node that represents where in the Record the 'current node' or 'context node' is
* @return a RecordPathResult that contains a FieldValue for each field that matches * @return a RecordPathResult that contains a FieldValue for each field that matches
*/ */
RecordPathResult evaluate(FieldValue contextNode); RecordPathResult evaluate(Record record, FieldValue contextNode);
/** /**
* Indicates whether the RecordPath is an Absolute Path (starts with a '/' character) or a Relative Path (starts with a '.' character). * Indicates whether the RecordPath is an Absolute Path (starts with a '/' character) or a Relative Path (starts with a '.' character).

View File

@ -34,7 +34,7 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter {
} }
@Override @Override
public Stream<FieldValue> filter(final FieldValue currentNode, final RecordPathEvaluationContext context) { public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
final Stream<FieldValue> rhsStream = rhs.evaluate(context); final Stream<FieldValue> rhsStream = rhs.evaluate(context);
final Optional<FieldValue> firstMatch = rhsStream final Optional<FieldValue> firstMatch = rhsStream
.filter(fieldVal -> fieldVal.getValue() != null) .filter(fieldVal -> fieldVal.getValue() != null)
@ -48,7 +48,10 @@ public abstract class BinaryOperatorFilter implements RecordPathFilter {
final Object value = fieldValue.getValue(); final Object value = fieldValue.getValue();
final Stream<FieldValue> lhsStream = lhs.evaluate(context); final Stream<FieldValue> lhsStream = lhs.evaluate(context);
return lhsStream.filter(fieldVal -> test(fieldVal, value)); return lhsStream.filter(fieldVal -> {
final boolean result = test(fieldVal, value);
return invert ? !result : result;
});
} }
@Override @Override

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import org.apache.nifi.record.path.paths.RecordPathSegment;
public class Contains extends StringComparisonFilter {
public Contains(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
super(recordPath, searchValuePath);
}
@Override
protected boolean isMatch(final String fieldValue, final String comparison) {
return fieldValue.contains(comparison);
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.LiteralValuePath;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class ContainsRegex extends FunctionFilter {
private final RecordPathSegment regexPath;
private final Pattern compiledPattern;
public ContainsRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) {
super(recordPath);
this.regexPath = regexPath;
if (regexPath instanceof LiteralValuePath) {
final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get();
final Object value = fieldValue.getValue();
final String regex = DataTypeUtils.toString(value, (String) null);
compiledPattern = Pattern.compile(regex);
} else {
compiledPattern = null;
}
}
@Override
protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
final Pattern pattern;
if (compiledPattern == null) {
final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst();
if (!fieldValueOption.isPresent()) {
return false;
}
final Object value = fieldValueOption.get().getValue();
if (value == null) {
return false;
}
final String regex = DataTypeUtils.toString(value, (String) null);
pattern = Pattern.compile(regex);
} else {
pattern = compiledPattern;
}
final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
if (searchString == null) {
return false;
}
return pattern.matcher(searchString).find();
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import org.apache.nifi.record.path.paths.RecordPathSegment;
public class EndsWith extends StringComparisonFilter {
public EndsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
super(recordPath, searchValuePath);
}
@Override
protected boolean isMatch(final String fieldValue, final String comparison) {
return fieldValue.endsWith(comparison);
}
}

View File

@ -37,10 +37,10 @@ public class EqualsFilter extends BinaryOperatorFilter {
if (rhsValue instanceof Number) { if (rhsValue instanceof Number) {
return compareNumbers((Number) lhsValue, (Number) rhsValue); return compareNumbers((Number) lhsValue, (Number) rhsValue);
} else { } else {
return false; return lhsValue.toString().equals(rhsValue.toString());
} }
} else if (rhsValue instanceof Number) { } else if (rhsValue instanceof Number) {
return false; return lhsValue.toString().equals(rhsValue.toString());
} }
return lhsValue.equals(rhsValue); return lhsValue.equals(rhsValue);

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
public abstract class FunctionFilter implements RecordPathFilter {
private final RecordPathSegment recordPath;
protected FunctionFilter(final RecordPathSegment recordPath) {
this.recordPath = recordPath;
}
@Override
public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
return recordPath.evaluate(context)
.filter(fv -> invert ? !test(fv, context) : test(fv, context));
}
protected abstract boolean test(FieldValue fieldValue, final RecordPathEvaluationContext context);
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class IsBlank extends FunctionFilter {
public IsBlank(RecordPathSegment recordPath) {
super(recordPath);
}
@Override
protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
if (fieldVal == null) {
return true;
}
return fieldVal.trim().isEmpty();
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class IsEmpty extends FunctionFilter {
public IsEmpty(RecordPathSegment recordPath) {
super(recordPath);
}
@Override
protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
if (fieldVal == null) {
return true;
}
return fieldVal.isEmpty();
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import java.util.Optional;
import java.util.regex.Pattern;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.LiteralValuePath;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class MatchesRegex extends FunctionFilter {
private final RecordPathSegment regexPath;
private final Pattern compiledPattern;
public MatchesRegex(RecordPathSegment recordPath, final RecordPathSegment regexPath) {
super(recordPath);
this.regexPath = regexPath;
if (regexPath instanceof LiteralValuePath) {
final FieldValue fieldValue = ((LiteralValuePath) regexPath).evaluate((RecordPathEvaluationContext) null).findFirst().get();
final Object value = fieldValue.getValue();
final String regex = DataTypeUtils.toString(value, (String) null);
compiledPattern = Pattern.compile(regex);
} else {
compiledPattern = null;
}
}
@Override
protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
final Pattern pattern;
if (compiledPattern == null) {
final Optional<FieldValue> fieldValueOption = regexPath.evaluate(context).findFirst();
if (!fieldValueOption.isPresent()) {
return false;
}
final Object value = fieldValueOption.get().getValue();
if (value == null) {
return false;
}
final String regex = DataTypeUtils.toString(value, (String) null);
pattern = Pattern.compile(regex);
} else {
pattern = compiledPattern;
}
final String searchString = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
if (searchString == null) {
return false;
}
return pattern.matcher(searchString).matches();
}
}

View File

@ -30,7 +30,7 @@ public class NotEqualsFilter extends BinaryOperatorFilter {
protected boolean test(final FieldValue fieldValue, final Object rhsValue) { protected boolean test(final FieldValue fieldValue, final Object rhsValue) {
final Object lhsValue = fieldValue.getValue(); final Object lhsValue = fieldValue.getValue();
if (lhsValue == null) { if (lhsValue == null) {
return rhsValue != null; return false;
} }
if (lhsValue instanceof Number) { if (lhsValue instanceof Number) {

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
public class NotFilter implements RecordPathFilter {
private final RecordPathFilter filter;
public NotFilter(final RecordPathFilter filter) {
this.filter = filter;
}
@Override
public Stream<FieldValue> filter(final RecordPathEvaluationContext context, final boolean invert) {
return filter.filter(context, !invert);
}
}

View File

@ -24,6 +24,6 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext;
public interface RecordPathFilter { public interface RecordPathFilter {
Stream<FieldValue> filter(FieldValue currentNode, RecordPathEvaluationContext context); Stream<FieldValue> filter(RecordPathEvaluationContext context, boolean invert);
} }

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import org.apache.nifi.record.path.paths.RecordPathSegment;
public class StartsWith extends StringComparisonFilter {
public StartsWith(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
super(recordPath, searchValuePath);
}
@Override
protected boolean isMatch(final String fieldValue, final String comparison) {
return fieldValue.startsWith(comparison);
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.filter;
import java.util.Optional;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public abstract class StringComparisonFilter extends FunctionFilter {
private final RecordPathSegment searchValuePath;
public StringComparisonFilter(RecordPathSegment recordPath, final RecordPathSegment searchValuePath) {
super(recordPath);
this.searchValuePath = searchValuePath;
}
@Override
protected boolean test(final FieldValue fieldValue, final RecordPathEvaluationContext context) {
final String fieldVal = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
if (fieldVal == null) {
return false;
}
final Optional<FieldValue> firstValue = searchValuePath.evaluate(context).findFirst();
if (!firstValue.isPresent()) {
return false;
}
final String searchValue = DataTypeUtils.toString(firstValue.get().getValue(), (String) null);
if (searchValue == null) {
return false;
}
return isMatch(fieldVal, searchValue);
}
protected abstract boolean isMatch(String fieldValue, String comparison);
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class Concat extends RecordPathSegment {
private final RecordPathSegment[] valuePaths;
public Concat(final RecordPathSegment[] valuePaths, final boolean absolute) {
super("concat", null, absolute);
this.valuePaths = valuePaths;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
Stream<FieldValue> concatenated = Stream.empty();
for (final RecordPathSegment valuePath : valuePaths) {
final Stream<FieldValue> stream = valuePath.evaluate(context);
concatenated = Stream.concat(concatenated, stream);
}
final StringBuilder sb = new StringBuilder();
concatenated.forEach(fv -> sb.append(DataTypeUtils.toString(fv.getValue(), (String) null)));
final RecordField field = new RecordField("concat", RecordFieldType.STRING.getDataType());
final FieldValue responseValue = new StandardFieldValue(sb.toString(), field, null);
return Stream.of(responseValue);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class Replace extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment searchValuePath;
private final RecordPathSegment replacementValuePath;
public Replace(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) {
super("replace", null, absolute);
this.recordPath = recordPath;
this.searchValuePath = searchValue;
this.replacementValuePath = replacementValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
if (searchValue == null) {
return fv;
}
final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context);
if (replacementValue == null) {
return fv;
}
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
final String replaced = value.replace(searchValue, replacementValue);
return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
public class ReplaceNull extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment replacementValuePath;
public ReplaceNull(final RecordPathSegment recordPath, final RecordPathSegment replacementValue, final boolean absolute) {
super("replaceNull", null, absolute);
this.recordPath = recordPath;
this.replacementValuePath = replacementValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues
.map(fv -> {
if (fv.getValue() != null) {
return fv;
}
final Optional<FieldValue> replacementOption = replacementValuePath.evaluate(context).findFirst();
if (!replacementOption.isPresent()) {
return fv;
}
final FieldValue replacementFieldValue = replacementOption.get();
final Object replacementValue = replacementFieldValue.getValue();
if (replacementValue == null) {
return fv;
}
return new StandardFieldValue(replacementValue, fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.LiteralValuePath;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class ReplaceRegex extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment searchValuePath;
private final RecordPathSegment replacementValuePath;
private final Pattern compiledPattern;
public ReplaceRegex(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final RecordPathSegment replacementValue, final boolean absolute) {
super("replaceRegex", null, absolute);
this.recordPath = recordPath;
this.searchValuePath = searchValue;
if (searchValue instanceof LiteralValuePath) {
final FieldValue fieldValue = ((LiteralValuePath) searchValue).evaluate((RecordPathEvaluationContext) null).findFirst().get();
final Object value = fieldValue.getValue();
final String regex = DataTypeUtils.toString(value, (String) null);
compiledPattern = Pattern.compile(regex);
} else {
compiledPattern = null;
}
this.replacementValuePath = replacementValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
// Determine the Replacement Value
final String replacementValue = RecordPathUtils.getFirstStringValue(replacementValuePath, context);
if (replacementValue == null) {
return fv;
}
final Pattern pattern;
if (compiledPattern == null) {
final Optional<FieldValue> fieldValueOption = searchValuePath.evaluate(context).findFirst();
if (!fieldValueOption.isPresent()) {
return fv;
}
final Object fieldValue = fieldValueOption.get().getValue();
if (value == null) {
return fv;
}
final String regex = DataTypeUtils.toString(fieldValue, (String) null);
pattern = Pattern.compile(regex);
} else {
pattern = compiledPattern;
}
final String replaced = pattern.matcher(value).replaceAll(replacementValue);
return new StandardFieldValue(replaced, fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class Substring extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment startIndexPath;
private final RecordPathSegment endIndexPath;
public Substring(final RecordPathSegment recordPath, final RecordPathSegment startIndex, final RecordPathSegment endIndex, final boolean absolute) {
super("substring", null, absolute);
this.recordPath = recordPath;
this.startIndexPath = startIndex;
this.endIndexPath = endIndex;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final OptionalInt startIndex = getIndex(startIndexPath, context);
if (!startIndex.isPresent()) {
return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
}
final OptionalInt endIndex = getIndex(endIndexPath, context);
if (!endIndex.isPresent()) {
return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
}
final int start = startIndex.getAsInt();
final int end = endIndex.getAsInt();
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
// Allow for negative indices to be used to reference offset from string length. We add 1 here because we want -1 to refer
// to the actual length of the string.
final int evaluatedEndIndex = end < 0 ? value.length() + 1 + end : end;
final int evaluatedStartIndex = start < 0 ? value.length() + 1 + start : start;
if (evaluatedEndIndex <= evaluatedStartIndex || evaluatedStartIndex < 0 || evaluatedStartIndex > value.length()) {
return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
}
final String substring = value.substring(evaluatedStartIndex, Math.min(evaluatedEndIndex, value.length()));
return new StandardFieldValue(substring, fv.getField(), fv.getParent().orElse(null));
});
}
private OptionalInt getIndex(final RecordPathSegment indexSegment, final RecordPathEvaluationContext context) {
final Optional<FieldValue> firstFieldValueOption = indexSegment.evaluate(context).findFirst();
if (!firstFieldValueOption.isPresent()) {
return OptionalInt.empty();
}
final FieldValue fieldValue = firstFieldValueOption.get();
final Object indexObject = fieldValue.getValue();
if (!DataTypeUtils.isIntegerTypeCompatible(indexObject)) {
return OptionalInt.empty();
}
final String fieldName;
final RecordField field = fieldValue.getField();
fieldName = field == null ? "<Unknown Field>" : field.getFieldName();
return OptionalInt.of(DataTypeUtils.toInteger(indexObject, fieldName));
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class SubstringAfter extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment searchValuePath;
public SubstringAfter(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
super("substringAfter", null, absolute);
this.recordPath = recordPath;
this.searchValuePath = searchValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
if (searchValue == null || searchValue.isEmpty()) {
return fv;
}
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
final int index = value.indexOf(searchValue);
if (index < 0) {
return fv;
}
if (value.length() < index + 1) {
return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
}
return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class SubstringAfterLast extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment searchValuePath;
public SubstringAfterLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
super("substringAfterLast", null, absolute);
this.recordPath = recordPath;
this.searchValuePath = searchValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
if (searchValue == null || searchValue.isEmpty()) {
return fv;
}
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
final int index = value.lastIndexOf(searchValue);
if (index < 0) {
return fv;
}
if (value.length() < index + 1) {
return new StandardFieldValue("", fv.getField(), fv.getParent().orElse(null));
}
return new StandardFieldValue(value.substring(index + 1), fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class SubstringBefore extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment searchValuePath;
public SubstringBefore(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
super("substringBefore", null, absolute);
this.recordPath = recordPath;
this.searchValuePath = searchValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
if (searchValue == null || searchValue.isEmpty()) {
return fv;
}
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
final int index = value.indexOf(searchValue);
if (index < 0) {
return fv;
}
return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import java.util.stream.Stream;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.record.path.util.RecordPathUtils;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class SubstringBeforeLast extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final RecordPathSegment searchValuePath;
public SubstringBeforeLast(final RecordPathSegment recordPath, final RecordPathSegment searchValue, final boolean absolute) {
super("substringBeforeLast", null, absolute);
this.recordPath = recordPath;
this.searchValuePath = searchValue;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
final String searchValue = RecordPathUtils.getFirstStringValue(searchValuePath, context);
if (searchValue == null || searchValue.isEmpty()) {
return fv;
}
final String value = DataTypeUtils.toString(fv.getValue(), (String) null);
final int index = value.lastIndexOf(searchValue);
if (index < 0) {
return fv;
}
return new StandardFieldValue(value.substring(0, index), fv.getField(), fv.getParent().orElse(null));
});
}
}

View File

@ -43,17 +43,12 @@ public class PredicatePath extends RecordPathSegment {
context.setContextNode(fieldVal); context.setContextNode(fieldVal);
try { try {
// Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult // Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult
// to implement for the RecordPathFilter's. So, instead, we pass the FieldValue to field and // to implement for the RecordPathFilter's. So, instead, we pass
// the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate, // the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate,
// though, we don't want to transform our Stream - we just want to filter it. So we handle this by // though, we don't want to transform our Stream - we just want to filter it. So we handle this by
// mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple // mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple
// times, we will limit the stream to 1 element. We also filter out any FieldValue whose value is null. // times, we will limit the stream to 1 element.
// This is done because if we have a predicate like [./iDoNotExist != 'hello'] then the relative path will return filter.filter(context, false)
// return a value of null and that will be compared to 'hello'. Since they are not equal, the NotEqualsFilter
// will return 'true', so we will get back a FieldValue with a null value. This should not make the Predicate
// true.
return filter.filter(fieldVal, context)
.filter(fv -> fv.getValue() != null)
.limit(1) .limit(1)
.map(ignore -> fieldVal); .map(ignore -> fieldVal);
} finally { } finally {

View File

@ -23,6 +23,7 @@ import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD;
import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE; import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE;
import static org.apache.nifi.record.path.RecordPathParser.EQUAL; import static org.apache.nifi.record.path.RecordPathParser.EQUAL;
import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME; import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME;
import static org.apache.nifi.record.path.RecordPathParser.FUNCTION;
import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN; import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN;
import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL; import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL;
import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN; import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN;
@ -48,17 +49,38 @@ import java.util.function.BiFunction;
import org.antlr.runtime.tree.Tree; import org.antlr.runtime.tree.Tree;
import org.apache.nifi.record.path.NumericRange; import org.apache.nifi.record.path.NumericRange;
import org.apache.nifi.record.path.exception.RecordPathException; import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.filter.Contains;
import org.apache.nifi.record.path.filter.ContainsRegex;
import org.apache.nifi.record.path.filter.EndsWith;
import org.apache.nifi.record.path.filter.EqualsFilter; import org.apache.nifi.record.path.filter.EqualsFilter;
import org.apache.nifi.record.path.filter.GreaterThanFilter; import org.apache.nifi.record.path.filter.GreaterThanFilter;
import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter; import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter;
import org.apache.nifi.record.path.filter.IsBlank;
import org.apache.nifi.record.path.filter.IsEmpty;
import org.apache.nifi.record.path.filter.LessThanFilter; import org.apache.nifi.record.path.filter.LessThanFilter;
import org.apache.nifi.record.path.filter.LessThanOrEqualFilter; import org.apache.nifi.record.path.filter.LessThanOrEqualFilter;
import org.apache.nifi.record.path.filter.MatchesRegex;
import org.apache.nifi.record.path.filter.NotEqualsFilter; import org.apache.nifi.record.path.filter.NotEqualsFilter;
import org.apache.nifi.record.path.filter.NotFilter;
import org.apache.nifi.record.path.filter.RecordPathFilter; import org.apache.nifi.record.path.filter.RecordPathFilter;
import org.apache.nifi.record.path.filter.StartsWith;
import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.Replace;
import org.apache.nifi.record.path.functions.ReplaceNull;
import org.apache.nifi.record.path.functions.ReplaceRegex;
import org.apache.nifi.record.path.functions.Substring;
import org.apache.nifi.record.path.functions.SubstringAfter;
import org.apache.nifi.record.path.functions.SubstringAfterLast;
import org.apache.nifi.record.path.functions.SubstringBefore;
import org.apache.nifi.record.path.functions.SubstringBeforeLast;
public class RecordPathCompiler { public class RecordPathCompiler {
public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) { public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) {
if (pathTree.getType() == FUNCTION) {
return buildPath(pathTree, null, absolute);
}
RecordPathSegment parent = root; RecordPathSegment parent = root;
for (int i = 0; i < pathTree.getChildCount(); i++) { for (int i = 0; i < pathTree.getChildCount(); i++) {
final Tree child = pathTree.getChild(i); final Tree child = pathTree.getChild(i);
@ -168,6 +190,59 @@ public class RecordPathCompiler {
case PATH: { case PATH: {
return compile(tree, new RootPath(), absolute); return compile(tree, new RootPath(), absolute);
} }
case FUNCTION: {
final String functionName = tree.getChild(0).getText();
final Tree argumentListTree = tree.getChild(1);
switch (functionName) {
case "substring": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
return new Substring(args[0], args[1], args[2], absolute);
}
case "substringAfter": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new SubstringAfter(args[0], args[1], absolute);
}
case "substringAfterLast": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new SubstringAfterLast(args[0], args[1], absolute);
}
case "substringBefore": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new SubstringBefore(args[0], args[1], absolute);
}
case "substringBeforeLast": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new SubstringBeforeLast(args[0], args[1], absolute);
}
case "replace": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
return new Replace(args[0], args[1], args[2], absolute);
}
case "replaceRegex": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 3, functionName, absolute);
return new ReplaceRegex(args[0], args[1], args[2], absolute);
}
case "replaceNull": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ReplaceNull(args[0], args[1], absolute);
}
case "concat": {
final int numArgs = argumentListTree.getChildCount();
final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
for (int i = 0; i < numArgs; i++) {
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
}
return new Concat(argPaths, absolute);
}
default: {
throw new RecordPathException("Invalid function call: The '" + functionName + "' function does not exist or can only "
+ "be used within a predicate, not as a standalone function");
}
}
}
} }
throw new RecordPathException("Encountered unexpected token " + tree); throw new RecordPathException("Encountered unexpected token " + tree);
@ -187,6 +262,8 @@ public class RecordPathCompiler {
return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute); return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute);
case GREATER_THAN_EQUAL: case GREATER_THAN_EQUAL:
return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute); return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute);
case FUNCTION:
return createFunctionFilter(operatorTree, absolute);
default: default:
throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree); throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree);
} }
@ -200,4 +277,66 @@ public class RecordPathCompiler {
final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute); final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute);
return function.apply(lhsPath, rhsPath); return function.apply(lhsPath, rhsPath);
} }
private static RecordPathFilter createFunctionFilter(final Tree functionTree, final boolean absolute) {
final String functionName = functionTree.getChild(0).getText();
final Tree argumentListTree = functionTree.getChild(1);
switch (functionName) {
case "contains": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Contains(args[0], args[1]);
}
case "matchesRegex": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new MatchesRegex(args[0], args[1]);
}
case "containsRegex": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new ContainsRegex(args[0], args[1]);
}
case "startsWith": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new StartsWith(args[0], args[1]);
}
case "endsWith": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new EndsWith(args[0], args[1]);
}
case "isEmpty": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new IsEmpty(args[0]);
}
case "isBlank": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new IsBlank(args[0]);
}
case "not": {
final int numArgs = argumentListTree.getChildCount();
if (numArgs != 1) {
throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes 1 argument but got " + numArgs);
}
final Tree childTree = argumentListTree.getChild(0);
final RecordPathFilter childFilter = createFilter(childTree, null, absolute);
return new NotFilter(childFilter);
}
}
throw new RecordPathException("Invalid function name: " + functionName);
}
private static RecordPathSegment[] getArgPaths(final Tree argumentListTree, final int expectedCount, final String functionName, final boolean absolute) {
final int numArgs = argumentListTree.getChildCount();
if (numArgs != expectedCount) {
throw new RecordPathException("Invalid number of arguments: " + functionName + " function takes " + expectedCount + " arguments but got " + numArgs);
}
final RecordPathSegment[] argPaths = new RecordPathSegment[expectedCount];
for (int i = 0; i < expectedCount; i++) {
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
}
return argPaths;
}
} }

View File

@ -25,7 +25,6 @@ import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathEvaluationContext; import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.RecordPathResult; import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.StandardRecordPathEvaluationContext; import org.apache.nifi.record.path.StandardRecordPathEvaluationContext;
import org.apache.nifi.record.path.util.Filters;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
public abstract class RecordPathSegment implements RecordPath { public abstract class RecordPathSegment implements RecordPath {
@ -33,7 +32,7 @@ public abstract class RecordPathSegment implements RecordPath {
private final RecordPathSegment parentPath; private final RecordPathSegment parentPath;
private final boolean absolute; private final boolean absolute;
RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) { public RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) {
this.path = path; this.path = path;
this.parentPath = parentPath; this.parentPath = parentPath;
this.absolute = absolute; this.absolute = absolute;
@ -98,34 +97,8 @@ public abstract class RecordPathSegment implements RecordPath {
} }
@Override @Override
public final RecordPathResult evaluate(final FieldValue contextNode) { public final RecordPathResult evaluate(final Record record, final FieldValue contextNode) {
final RecordPathEvaluationContext context; final RecordPathEvaluationContext context = new StandardRecordPathEvaluationContext(record);
if (Filters.isRecord(contextNode.getField().getDataType(), contextNode.getValue())) {
final Record record = (Record) contextNode.getValue();
if (record == null) {
return new RecordPathResult() {
@Override
public String getPath() {
return RecordPathSegment.this.getPath();
}
@Override
public Stream<FieldValue> getSelectedFields() {
return Stream.empty();
}
};
}
context = new StandardRecordPathEvaluationContext(record);
} else {
final FieldValue parent = contextNode.getParent().orElse(null);
if (parent == null) {
context = new StandardRecordPathEvaluationContext(null);
} else {
context = new StandardRecordPathEvaluationContext(parent.getParentRecord().orElse(null));
}
}
context.setContextNode(contextNode); context.setContextNode(contextNode);
final Stream<FieldValue> selectedFields = evaluate(context); final Stream<FieldValue> selectedFields = evaluate(context);

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.util;
import java.util.Optional;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
public class RecordPathUtils {
public static String getFirstStringValue(final RecordPathSegment segment, final RecordPathEvaluationContext context) {
final Optional<FieldValue> stringFieldValue = segment.evaluate(context).findFirst();
if (!stringFieldValue.isPresent()) {
return null;
}
final String stringValue = DataTypeUtils.toString(stringFieldValue.get().getValue(), (String) null);
if (stringValue == null) {
return null;
}
return stringValue;
}
}

View File

@ -28,6 +28,7 @@ import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MapRecord;
@ -45,6 +46,19 @@ public class TestRecordPath {
System.out.println(RecordPath.compile("/person[2]")); System.out.println(RecordPath.compile("/person[2]"));
System.out.println(RecordPath.compile("//person[2]")); System.out.println(RecordPath.compile("//person[2]"));
System.out.println(RecordPath.compile("/person/child[1]//sibling/name")); System.out.println(RecordPath.compile("/person/child[1]//sibling/name"));
// contains is a 'filter function' so can be used as the predicate
RecordPath.compile("/name[contains(., 'hello')]");
// substring is not a filter function so cannot be used as a predicate
try {
RecordPath.compile("/name[substring(., 1, 2)]");
} catch (final RecordPathException e) {
// expected
}
// substring is not a filter function so can be used as *part* of a predicate but not as the entire predicate
RecordPath.compile("/name[substring(., 1, 2) = 'e']");
} }
@Test @Test
@ -682,7 +696,7 @@ public class TestRecordPath {
final FieldValue recordFieldValue = new StandardFieldValue(record, new RecordField("record", RecordFieldType.RECORD.getDataType()), null); final FieldValue recordFieldValue = new StandardFieldValue(record, new RecordField("record", RecordFieldType.RECORD.getDataType()), null);
final List<FieldValue> fieldValues = RecordPath.compile("./name").evaluate(recordFieldValue).getSelectedFields().collect(Collectors.toList()); final List<FieldValue> fieldValues = RecordPath.compile("./name").evaluate(record, recordFieldValue).getSelectedFields().collect(Collectors.toList());
assertEquals(1, fieldValues.size()); assertEquals(1, fieldValues.size());
final FieldValue fieldValue = fieldValues.get(0); final FieldValue fieldValue = fieldValues.get(0);
@ -702,7 +716,7 @@ public class TestRecordPath {
final FieldValue recordFieldValue = new StandardFieldValue(record, new RecordField("root", RecordFieldType.RECORD.getRecordDataType(record.getSchema())), null); final FieldValue recordFieldValue = new StandardFieldValue(record, new RecordField("root", RecordFieldType.RECORD.getRecordDataType(record.getSchema())), null);
final FieldValue nameFieldValue = new StandardFieldValue("John Doe", new RecordField("name", RecordFieldType.STRING.getDataType()), recordFieldValue); final FieldValue nameFieldValue = new StandardFieldValue("John Doe", new RecordField("name", RecordFieldType.STRING.getDataType()), recordFieldValue);
final List<FieldValue> fieldValues = RecordPath.compile(".").evaluate(nameFieldValue).getSelectedFields().collect(Collectors.toList()); final List<FieldValue> fieldValues = RecordPath.compile(".").evaluate(record, nameFieldValue).getSelectedFields().collect(Collectors.toList());
assertEquals(1, fieldValues.size()); assertEquals(1, fieldValues.size());
final FieldValue fieldValue = fieldValues.get(0); final FieldValue fieldValue = fieldValues.get(0);
@ -714,6 +728,277 @@ public class TestRecordPath {
assertEquals("Jane Doe", record.getValue("name")); assertEquals("Jane Doe", record.getValue("name"));
} }
@Test
public void testSubstringFunction() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
final FieldValue fieldValue = RecordPath.compile("substring(/name, 0, 4)").evaluate(record).getSelectedFields().findFirst().get();
assertEquals("John", fieldValue.getValue());
assertEquals("John", RecordPath.compile("substring(/name, 0, -5)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("", RecordPath.compile("substring(/name, 1000, 1005)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("", RecordPath.compile("substring(/name, 4, 3)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substring(/name, 0, 10000)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("", RecordPath.compile("substring(/name, -50, -1)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testSubstringBeforeFunction() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("John", RecordPath.compile("substringBefore(/name, ' ')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringBefore(/name, 'XYZ')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringBefore(/name, '')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John D", RecordPath.compile("substringBeforeLast(/name, 'o')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringBeforeLast(/name, 'XYZ')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringBeforeLast(/name, '')").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testSubstringAfterFunction() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("hn Doe", RecordPath.compile("substringAfter(/name, 'o')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringAfter(/name, 'XYZ')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringAfter(/name, '')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("e", RecordPath.compile("substringAfterLast(/name, 'o')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringAfterLast(/name, 'XYZ')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("substringAfterLast(/name, '')").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testContains() {
final Record record = createSimpleRecord();
assertEquals("John Doe", RecordPath.compile("/name[contains(., 'o')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[contains(., 'x')]").evaluate(record).getSelectedFields().count());
record.setValue("name", "John Doe 48");
assertEquals("John Doe 48", RecordPath.compile("/name[contains(., /id)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testStartsWith() {
final Record record = createSimpleRecord();
assertEquals("John Doe", RecordPath.compile("/name[startsWith(., 'J')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[startsWith(., 'x')]").evaluate(record).getSelectedFields().count());
assertEquals("John Doe", RecordPath.compile("/name[startsWith(., '')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testEndsWith() {
final Record record = createSimpleRecord();
assertEquals("John Doe", RecordPath.compile("/name[endsWith(., 'e')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[endsWith(., 'x')]").evaluate(record).getSelectedFields().count());
assertEquals("John Doe", RecordPath.compile("/name[endsWith(., '')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testIsEmpty() {
final Record record = createSimpleRecord();
assertEquals("John Doe", RecordPath.compile("/name[isEmpty(../missing)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("/name[isEmpty(/missing)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[isEmpty(../id)]").evaluate(record).getSelectedFields().count());
record.setValue("missing", " ");
assertEquals(0L, RecordPath.compile("/name[isEmpty(/missing)]").evaluate(record).getSelectedFields().count());
}
@Test
public void testIsBlank() {
final Record record = createSimpleRecord();
assertEquals("John Doe", RecordPath.compile("/name[isBlank(../missing)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
record.setValue("missing", " ");
assertEquals("John Doe", RecordPath.compile("/name[isBlank(../missing)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("/name[isBlank(/missing)]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[isBlank(../id)]").evaluate(record).getSelectedFields().count());
}
@Test
public void testContainsRegex() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("John Doe", RecordPath.compile("/name[containsRegex(., 'o')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("/name[containsRegex(., '[xo]')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[containsRegex(., 'x')]").evaluate(record).getSelectedFields().count());
}
@Test
public void testNot() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("John Doe", RecordPath.compile("/name[not(contains(., 'x'))]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[not(. = 'John Doe')]").evaluate(record).getSelectedFields().count());
assertEquals("John Doe", RecordPath.compile("/name[not(. = 'Jane Doe')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testChainingFunctions() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("John Doe", RecordPath.compile("/name[contains(substringAfter(., 'o'), 'h')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testMatchesRegex() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals(0L, RecordPath.compile("/name[matchesRegex(., 'John D')]").evaluate(record).getSelectedFields().count());
assertEquals("John Doe", RecordPath.compile("/name[matchesRegex(., '[John Doe]{8}')]").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testReplace() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("John Doe", RecordPath.compile("/name[replace(../id, 48, 18) = 18]").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(0L, RecordPath.compile("/name[replace(../id, 48, 18) = 48]").evaluate(record).getSelectedFields().count());
assertEquals("Jane Doe", RecordPath.compile("replace(/name, 'ohn', 'ane')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("replace(/name, 'ohnny', 'ane')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John 48", RecordPath.compile("replace(/name, 'Doe', /id)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("23", RecordPath.compile("replace(/id, 48, 23)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testReplaceRegex() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals("ohn oe", RecordPath.compile("replaceRegex(/name, '[JD]', '')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, 'ohnny', 'ane')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("11", RecordPath.compile("replaceRegex(/id, '[0-9]', 1)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("Jxohn Dxoe", RecordPath.compile("replaceRegex(/name, '([JD])', '$1x')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("Jxohn Dxoe", RecordPath.compile("replaceRegex(/name, '(?<hello>[JD])', '${hello}x')").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals("48ohn 48oe", RecordPath.compile("replaceRegex(/name, '(?<hello>[JD])', /id)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testReplaceNull() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("missing", RecordFieldType.LONG.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
assertEquals(48, RecordPath.compile("replaceNull(/missing, /id)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(14, RecordPath.compile("replaceNull(/missing, 14)").evaluate(record).getSelectedFields().findFirst().get().getValue());
assertEquals(48, RecordPath.compile("replaceNull(/id, 14)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
@Test
public void testConcat() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("fullName", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("firstName", RecordFieldType.LONG.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("lastName", "Doe");
values.put("firstName", "John");
final Record record = new MapRecord(schema, values);
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
}
private List<RecordField> getDefaultFields() { private List<RecordField> getDefaultFields() {
final List<RecordField> fields = new ArrayList<>(); final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType())); fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
@ -738,4 +1023,19 @@ public class TestRecordPath {
return accountSchema; return accountSchema;
} }
private Record createSimpleRecord() {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("missing", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Map<String, Object> values = new HashMap<>();
values.put("id", 48);
values.put("name", "John Doe");
final Record record = new MapRecord(schema, values);
return record;
}
} }

View File

@ -157,6 +157,15 @@ when we reference an Array field and want to only reference some of the elements
specific entries in the Map; or when we want to reference a Record only if it adheres to some criteria. We can accomplish this by providing our criteria to the specific entries in the Map; or when we want to reference a Record only if it adheres to some criteria. We can accomplish this by providing our criteria to the
RecordPath within square brackets (using the `[` and `]` characters). We will go over each of these cases below. RecordPath within square brackets (using the `[` and `]` characters). We will go over each of these cases below.
[[function_usage]]
== Function Usage
In addition to retrieving a field from a Record, as outlined above in the <<filters>> section, we sometimes need to refine which fields we want to select. Or we
may want to return a modified version of a field. To do this, we rely on functions. The syntax for a function is <function name> <open parenthesis> <args> <close parenthesis>,
where <args> represents one or more arguments separated by commas. An argument may be a string literal (such as `'hello'`) or a number literal (such as `48`), or could be
a relative or absolute RecordPath (such as `./name` or `/id`). Additionally, we can use functions within a filter. For example, we could use a RecordPath such as
`/person[ isEmpty('name') ]/id` to retrieve the `id` field of any person whose name is empty. A listing of functions that are available and their corresponding documentation
can be found below in the <<functions>> section.
[[arrays]] [[arrays]]
=== Arrays === Arrays
@ -291,3 +300,246 @@ value of the `preferredState` field.
Additionally, we can write a RecordPath that references the "city" field of any record whose state is "NJ" by using the parent operator (`..`): `/*/city[../state = 'NJ']`. Additionally, we can write a RecordPath that references the "city" field of any record whose state is "NJ" by using the parent operator (`..`): `/*/city[../state = 'NJ']`.
[[functions]]
== Functions
In the <<function_usage>> section above, we describe how and why to use a function in RecordPath. Here, we will describe the different functions that are available,
what they do, and how they work. Functions can be divided into two groups: <<standalone_functions>>, which can be the 'root' of a RecordPath, such as `substringAfter( /name, ' ' )`
and <<filter_functions>>, which are to be used as a filter, such as `/name[ contains('John') ]`. A Standalone Function can also be used within a filter but does not return a `boolean`
(`true` or `false` value) and therefore cannot itself be an entire filter. For example, we can use a path such as `/name[ substringAfter(., ' ') = 'Doe']` but we cannot simply use
`/name[ substringAfter(., ' ') ]` because doing so doesn't really make sense, as filters must be boolean values.
Unless otherwise noted, all of the examples below are written to operate on the following Record:
----
{
"name": "John Doe",
"workAddress": {
"number": "123",
"street": "5th Avenue",
"city": "New York",
"state": "NY",
"zip": "10020"
},
"homeAddress": {
"number": "456",
"street": "Grand St",
"city": "Jersey City",
"state": "NJ",
"zip": "07304"
},
"details": {
"position": "Dataflow Engineer",
"preferredState": "NY",
"employer": "",
"vehicle": null,
"phrase": " "
}
}
----
[[standalone_functions]]
== Standalone Functions
=== substring
The substring function returns a portion of a String value. The function requires 3 arguments: The value to take a portion of, the 0-based start index (inclusive),
and the 0-based end index (exclusive). The start index and end index can be `0` to indicate the first character of a String, a positive integer to indicate the nth index
into the string, or a negative integer. If the value is a negative integer, say `-n`, then this represents the `n`th character for the end. A value of `-1` indicates the last
character in the String. So, for example, `substring( 'hello world', 0, -1 )` means to take the string `hello`, and return characters 0 through the last character, so the return
value will be `hello world`.
|==========================================================
| RecordPath | Return value
| `substring( /name, 0, -1 )` | John Doe
| `substring( /name, 0, -5 )` | John
| `substring( /name, 1000, 1005 )` | <empty string>
| `substring( /name, 0, 1005)` | John Doe
| `substring( /name, -50, -1)` | <empty string>
|==========================================================
=== substringAfter
Returns the portion of a String value that occurs after the first occurrence of some other value.
|==========================================================
| RecordPath | Return value
| `substringAfter( /name, ' ' )` | Doe
| `substringAfter( /name, 'o' )` | hn Doe
| `substringAfter( /name, '' )` | John Doe
| `substringAfter( /name, 'xyz' )` | John Doe
|==========================================================
=== substringAfterLast
Returns the portion of a String value that occurs after the last occurrence of some other value.
|==========================================================
| RecordPath | Return value
| `substringAfterLast( /name, ' ' )` | Doe
| `substringAfterLast( /name, 'o' )` | e
| `substringAfterLast( /name, '' )` | John Doe
| `substringAfterLast( /name, 'xyz' )` | John Doe
|==========================================================
=== substringBefore
Returns the portion of a String value that occurs before the first occurrence of some other value.
|==========================================================
| RecordPath | Return value
| `substringBefore( /name, ' ' )` | John
| `substringBefore( /name, 'o' )` | J
| `substringBefore( /name, '' )` | John Doe
| `substringBefore( /name, 'xyz' )` | John Doe
|==========================================================
=== substringBeforeLast
Returns the portion of a String value that occurs before the last occurrence of some other value.
|==========================================================
| RecordPath | Return value
| `substringBeforeLast( /name, ' ' )` | John
| `substringBeforeLast( /name, 'o' )` | John D
| `substringBeforeLast( /name, '' )` | John Doe
| `substringBeforeLast( /name, 'xyz' )` | John Doe
|==========================================================
=== replace
Replaces all occurrences of a String with another String.
|==========================================================
| RecordPath | Return value
| `replace( /name, 'o', 'x' )` | Jxhn Dxe
| `replace( /name, 'o', 'xyz' )` | Jxyzhn Dxyze
| `replace( /name, 'xyz', 'zyx' )` | John Doe
| `replace( /name, 'Doe', /workAddress/city )` | John New York
|==========================================================
=== replaceRegex
Evaluates a Regular Expression against the contents of a String value and replaces any match with another value.
This function requires 3 arguments: the String to run the regular expression against, the regular expression to run,
and the replacement value. The replacement value may optionally use back-references, such as `$1` and `${named_group}`
|==================================================================
| RecordPath | Return value
| `replaceRegex( /name, 'o', 'x' )` | Jxhn Dxe
| `replaceRegex( /name, 'o', 'xyz' )` | Jxyzhn Dxyze
| `replaceRegex( /name, 'xyz', 'zyx' )` | John Doe
| `replaceRegex( /name, '\s+.*', /workAddress/city )` | John New York
| `replaceRegex(/name, '([JD])', '$1x')` | Jxohn Dxoe
| `replaceRegex(/name, '(?<hello>[JD])', '${hello}x')` | Jxohn Dxoe
|==================================================================
[[filter_functions]]
== Filter Functions
=== contains
Returns `true` if a String value contains the provided substring, `false` otherwise
|==============================================================================
| RecordPath | Return value
| `/name[contains(., 'o')]` | John Doe
| `/name[contains(., 'x')]` | <returns no results>
| `/name[contains( ../workAddress/state, /details/preferredState )]` | John Doe
|==============================================================================
=== matchesRegex
Evaluates a Regular Expression against the contents of a String value and returns `true` if the Regular Expression
exactly matches the String value, `false` otherwise.
This function requires 2 arguments: the String to run the regular expression against, and the regular expression to run.
|==============================================================================
| RecordPath | Return value
| `/name[matchesRegex(., 'John Doe')]` | John Doe
| `/name[matchesRegex(., 'John')]` | <returns no results>
| `/name[matchesRegex(., '.* Doe' )]` | John Doe
|==============================================================================
=== startsWith
Returns `true` if a String value starts with the provided substring, `false` otherwise
|==============================================================================
| RecordPath | Return value
| `/name[startsWith(., 'J')]` | John Doe
| `/name[startsWith(., 'x')]` | <returns no results>
| `/name[startsWith(., 'xyz')]` | <returns no results>
| `/name[startsWith(., '')]` | John Doe
|==============================================================================
=== endsWith
Returns `true` if a String value ends with the provided substring, `false` otherwise
|==============================================================================
| RecordPath | Return value
| `/name[endsWith(., 'e')]` | John Doe
| `/name[endsWith(., 'x')]` | <returns no results>
| `/name[endsWith(., 'xyz')]` | <returns no results>
| `/name[endsWith(., '')]` | John Doe
|==============================================================================
=== not
Inverts the value of the function or expression that is passed into the `not` function.
|==============================================================================
| RecordPath | Return value
| `/name[not(endsWith(., 'x'))]` | John Doe
| `/name[not(contains(., 'x'))]` | John Doe
| `/name[not(endsWith(., 'e'))]` | <returns no results>
|==============================================================================
=== isEmpty
Returns `true` if the provided value is either null or is an empty string.
|==============================================================================
| RecordPath | Return value
| `/name[isEmpty(/details/employer)]` | John Doe
| `/name[isEmpty(/details/vehicle)]` | John Doe
| `/name[isEmpty(/details/phase)]` | <returns no results>
| `/name[isEmpty(.)]` | <returns no results>
|==============================================================================
=== isBlank
Returns `true` if the provided value is either null or is an empty string or a string that consists
only of white space (spaces, tabs, carriage returns, and new-line characters).
|==============================================================================
| RecordPath | Return value
| `/name[isBlank(/details/employer)]` | John Doe
| `/name[isBlank(/details/vehicle)]` | John Doe
| `/name[isBlank(/details/phase)]` | John Doe
| `/name[isBlank(.)]` | <returns no results>
|==============================================================================

View File

@ -188,7 +188,7 @@ public class UpdateRecord extends AbstractRecordProcessor {
private void processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record, final String replacementValue) { private void processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record, final String replacementValue) {
destinationFields.forEach(fieldVal -> { destinationFields.forEach(fieldVal -> {
final RecordPathResult replacementResult = replacementRecordPath.evaluate(fieldVal); final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal);
final Object replacementObject = getReplacementObject(replacementResult, replacementValue); final Object replacementObject = getReplacementObject(replacementResult, replacementValue);
fieldVal.updateValue(replacementObject); fieldVal.updateValue(replacementObject);
}); });