mirror of https://github.com/apache/nifi.git
NIFI-12538 Added mapOf Record Standalone Function
This closes #8182 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7a0449b425
commit
a7c9eccf4a
|
@ -0,0 +1,65 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.record.path.functions;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import org.apache.nifi.record.path.FieldValue;
|
||||||
|
import org.apache.nifi.record.path.RecordPathEvaluationContext;
|
||||||
|
import org.apache.nifi.record.path.StandardFieldValue;
|
||||||
|
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||||
|
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||||
|
import org.apache.nifi.serialization.record.MapRecord;
|
||||||
|
import org.apache.nifi.serialization.record.Record;
|
||||||
|
import org.apache.nifi.serialization.record.RecordField;
|
||||||
|
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||||
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
|
||||||
|
public class MapOf extends RecordPathSegment {
|
||||||
|
private final RecordPathSegment[] valuePaths;
|
||||||
|
|
||||||
|
public MapOf(final RecordPathSegment[] valuePaths, final boolean absolute) {
|
||||||
|
super("mapOf", null, absolute);
|
||||||
|
this.valuePaths = valuePaths;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
|
||||||
|
|
||||||
|
final List<RecordField> fields = new ArrayList<>();
|
||||||
|
final java.util.Map<String, Object> values = new HashMap<>();
|
||||||
|
|
||||||
|
for (int i = 0; i + 1 < valuePaths.length; i += 2) {
|
||||||
|
final String key = valuePaths[i].evaluate(context).findFirst().get().toString();
|
||||||
|
final String value = valuePaths[i+1].evaluate(context).findFirst().get().toString();
|
||||||
|
fields.add(new RecordField(key, RecordFieldType.STRING.getDataType()));
|
||||||
|
values.put(key, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||||
|
final Record record = new MapRecord(schema, values);
|
||||||
|
final RecordField field = new RecordField("mapOf", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
|
||||||
|
|
||||||
|
final FieldValue responseValue = new StandardFieldValue(record, field, null);
|
||||||
|
return Stream.of(responseValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -45,6 +45,7 @@ import org.apache.nifi.record.path.functions.FieldName;
|
||||||
import org.apache.nifi.record.path.functions.FilterFunction;
|
import org.apache.nifi.record.path.functions.FilterFunction;
|
||||||
import org.apache.nifi.record.path.functions.Format;
|
import org.apache.nifi.record.path.functions.Format;
|
||||||
import org.apache.nifi.record.path.functions.Hash;
|
import org.apache.nifi.record.path.functions.Hash;
|
||||||
|
import org.apache.nifi.record.path.functions.MapOf;
|
||||||
import org.apache.nifi.record.path.functions.PadLeft;
|
import org.apache.nifi.record.path.functions.PadLeft;
|
||||||
import org.apache.nifi.record.path.functions.PadRight;
|
import org.apache.nifi.record.path.functions.PadRight;
|
||||||
import org.apache.nifi.record.path.functions.Replace;
|
import org.apache.nifi.record.path.functions.Replace;
|
||||||
|
@ -270,6 +271,20 @@ public class RecordPathCompiler {
|
||||||
|
|
||||||
return new Concat(argPaths, absolute);
|
return new Concat(argPaths, absolute);
|
||||||
}
|
}
|
||||||
|
case "mapOf": {
|
||||||
|
final int numArgs = argumentListTree.getChildCount();
|
||||||
|
|
||||||
|
if(numArgs % 2 != 0) {
|
||||||
|
throw new RecordPathException("The mapOf function requires an even number of arguments");
|
||||||
|
}
|
||||||
|
|
||||||
|
final RecordPathSegment[] argPaths = new RecordPathSegment[numArgs];
|
||||||
|
for (int i = 0; i < numArgs; i++) {
|
||||||
|
argPaths[i] = buildPath(argumentListTree.getChild(i), null, absolute);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new MapOf(argPaths, absolute);
|
||||||
|
}
|
||||||
case "toLowerCase": {
|
case "toLowerCase": {
|
||||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||||
return new ToLowerCase(args[0], absolute);
|
return new ToLowerCase(args[0], absolute);
|
||||||
|
|
|
@ -1242,6 +1242,26 @@ public class TestRecordPath {
|
||||||
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
assertEquals("John Doe: 48", RecordPath.compile("concat(/firstName, ' ', /lastName, ': ', 48)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMapOf() {
|
||||||
|
final List<RecordField> fields = new ArrayList<>();
|
||||||
|
fields.add(new RecordField("fullName", RecordFieldType.INT.getDataType()));
|
||||||
|
fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType()));
|
||||||
|
fields.add(new RecordField("firstName", RecordFieldType.LONG.getDataType()));
|
||||||
|
|
||||||
|
final RecordSchema schema = new SimpleRecordSchema(fields);
|
||||||
|
|
||||||
|
final Map<String, Object> values = new HashMap<>();
|
||||||
|
values.put("lastName", "Doe");
|
||||||
|
values.put("firstName", "John");
|
||||||
|
final Record record = new MapRecord(schema, values);
|
||||||
|
final FieldValue fv = RecordPath.compile("mapOf('firstName', /firstName, 'lastName', /lastName)").evaluate(record).getSelectedFields().findFirst().get();
|
||||||
|
assertTrue(fv.getField().getDataType().getFieldType().equals(RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()).getFieldType()));
|
||||||
|
assertEquals("MapRecord[{firstName=John, lastName=Doe}]", fv.getValue().toString());
|
||||||
|
|
||||||
|
assertThrows(RecordPathException.class, () -> RecordPath.compile("mapOf('firstName', /firstName, 'lastName')").evaluate(record));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCoalesce() {
|
public void testCoalesce() {
|
||||||
|
|
|
@ -1122,6 +1122,35 @@ Would yield a value of `2`. We could also use this as a filter, such as:
|
||||||
Which will return the `id` element with a value of `1234`.
|
Which will return the `id` element with a value of `1234`.
|
||||||
|
|
||||||
|
|
||||||
|
=== mapOf
|
||||||
|
|
||||||
|
Creates a map of Strings with the given parameters. For example, if we have the following record:
|
||||||
|
|
||||||
|
----
|
||||||
|
{
|
||||||
|
"firstName": "John",
|
||||||
|
"lastName": "Snow"
|
||||||
|
}
|
||||||
|
----
|
||||||
|
|
||||||
|
We could use the `UpdateRecord` processor with
|
||||||
|
|
||||||
|
----
|
||||||
|
/fullName => mapOf("firstName", /firstName, "lastName", /lastName)
|
||||||
|
----
|
||||||
|
|
||||||
|
And that would give us something like:
|
||||||
|
|
||||||
|
----
|
||||||
|
{
|
||||||
|
"firstName": "John",
|
||||||
|
"lastName": "Snow",
|
||||||
|
"fullName": {"firstName": "John", "lastName": "Snow"}
|
||||||
|
}
|
||||||
|
----
|
||||||
|
|
||||||
|
This function requires an even number of arguments and the record paths must represent simple field values.
|
||||||
|
|
||||||
[[filter_functions]]
|
[[filter_functions]]
|
||||||
== Filter Functions
|
== Filter Functions
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue