NIFI-8137 Record Path EscapeJson/UnescapeJson functions (#4756)

* NIFI-8137 Record Path EscapeJson/UnescapeJson functions

* Correct jackson-databind dependency version

* Add negative tests for RecordPath JSON handling; rename RecordPath JSON classes to better match existing functions

Signed-off-by: Otto Fowler <ottobackwards@gmail.com>

This closes #4756.
This commit is contained in:
Chris Sampson 2021-06-04 14:19:24 +01:00 committed by GitHub
parent 5045adf29c
commit bc5204d4df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 402 additions and 9 deletions

View File

@ -599,15 +599,15 @@ public class ExpressionCompiler {
}
case ESCAPE_CSV: {
verifyArgCount(argEvaluators, 0, "escapeCsv");
return addToken(CharSequenceTranslatorEvaluator.csvEscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.csvEscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeCsv");
}
case ESCAPE_HTML3: {
verifyArgCount(argEvaluators, 0, "escapeHtml3");
return addToken(CharSequenceTranslatorEvaluator.html3EscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.html3EscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeHtml3");
}
case ESCAPE_HTML4: {
verifyArgCount(argEvaluators, 0, "escapeHtml4");
return addToken(CharSequenceTranslatorEvaluator.html4EscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.html4EscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeHtml4");
}
case ESCAPE_JSON: {
verifyArgCount(argEvaluators, 0, "escapeJson");
@ -615,27 +615,27 @@ public class ExpressionCompiler {
}
case ESCAPE_XML: {
verifyArgCount(argEvaluators, 0, "escapeXml");
return addToken(CharSequenceTranslatorEvaluator.xmlEscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.xmlEscapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeXml");
}
case UNESCAPE_CSV: {
verifyArgCount(argEvaluators, 0, "unescapeCsv");
return addToken(CharSequenceTranslatorEvaluator.csvUnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.csvUnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "unescapeCsv");
}
case UNESCAPE_HTML3: {
verifyArgCount(argEvaluators, 0, "unescapeHtml3");
return addToken(CharSequenceTranslatorEvaluator.html3UnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.html3UnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "unescapeHtml3");
}
case UNESCAPE_HTML4: {
verifyArgCount(argEvaluators, 0, "unescapeHtml4");
return addToken(CharSequenceTranslatorEvaluator.html4UnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.html4UnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "unescapeHtml4");
}
case UNESCAPE_JSON: {
verifyArgCount(argEvaluators, 0, "unescapeJson");
return addToken(CharSequenceTranslatorEvaluator.jsonUnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.jsonUnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "unescapeJson");
}
case UNESCAPE_XML: {
verifyArgCount(argEvaluators, 0, "unescapeXml");
return addToken(CharSequenceTranslatorEvaluator.xmlUnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "escapeJson");
return addToken(CharSequenceTranslatorEvaluator.xmlUnescapeEvaluator(toStringEvaluator(subjectEvaluator)), "unescapeXml");
}
case SUBSTRING_BEFORE: {
verifyArgCount(argEvaluators, 1, "substringBefore");

View File

@ -91,5 +91,10 @@
<artifactId>commons-codec</artifactId>
<version>1.14</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.util.stream.Stream;
public class JsonEscape extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final ObjectMapper objectMapper = new ObjectMapper();
public JsonEscape(final RecordPathSegment recordPath, final boolean absolute) {
super("jsonEscape", null, absolute);
this.recordPath = recordPath;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
Object value = fv.getValue();
if (value == null) {
return new StandardFieldValue(null, fv.getField(), fv.getParent().orElse(null));
} else {
if (value instanceof Record) {
value = DataTypeUtils.convertRecordFieldtoObject(value, RecordFieldType.RECORD.getDataType());
}
try {
return new StandardFieldValue(objectMapper.writeValueAsString(value), fv.getField(), fv.getParent().orElse(null));
} catch (JsonProcessingException e) {
throw new RecordPathException("Unable to serialise Record Path value as JSON String", e);
}
}
});
}
}

View File

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.path.functions;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPathEvaluationContext;
import org.apache.nifi.record.path.StandardFieldValue;
import org.apache.nifi.record.path.exception.RecordPathException;
import org.apache.nifi.record.path.paths.RecordPathSegment;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Stream;
public class JsonUnescape extends RecordPathSegment {
private final RecordPathSegment recordPath;
private final ObjectMapper objectMapper = new ObjectMapper();
public JsonUnescape(final RecordPathSegment recordPath, final boolean absolute) {
super("jsonUnescape", null, absolute);
this.recordPath = recordPath;
}
@Override
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
return fieldValues.filter(fv -> fv.getValue() != null)
.map(fv -> {
Object value = fv.getValue();
if (value instanceof String) {
try {
DataType dataType = fv.getField().getDataType();
if (fv.getField().getDataType() instanceof ChoiceDataType) {
dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) fv.getField().getDataType());
}
return new StandardFieldValue(convertFieldValue(value, fv.getField().getFieldName(), dataType), fv.getField(), fv.getParent().orElse(null));
} catch (IOException e) {
throw new RecordPathException("Unable to deserialise JSON String into Record Path value", e);
}
} else {
throw new IllegalArgumentException("Argument supplied to jsonUnescape must be a String");
}
});
}
@SuppressWarnings("unchecked")
private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType) throws IOException {
if (dataType instanceof RecordDataType) {
// convert Maps to Records
final Map<String, Object> map = objectMapper.readValue(value.toString(), Map.class);
return DataTypeUtils.toRecord(map, ((RecordDataType) dataType).getChildSchema(), fieldName);
} else if (dataType instanceof ArrayDataType) {
final DataType elementDataType = ((ArrayDataType) dataType).getElementType();
// convert Arrays of Maps to Records
Object[] arr = objectMapper.readValue(value.toString(), Object[].class);
if (elementDataType instanceof RecordDataType) {
arr = Arrays.stream(arr).map(e -> DataTypeUtils.toRecord(e, ((RecordDataType) elementDataType).getChildSchema(), fieldName)).toArray();
}
return arr;
} else {
// generic conversion for simpler fields
return objectMapper.readValue(value.toString(), Object.class);
}
}
}

View File

@ -39,6 +39,7 @@ import org.apache.nifi.record.path.functions.Base64Decode;
import org.apache.nifi.record.path.functions.Base64Encode;
import org.apache.nifi.record.path.functions.Coalesce;
import org.apache.nifi.record.path.functions.Concat;
import org.apache.nifi.record.path.functions.JsonEscape;
import org.apache.nifi.record.path.functions.FieldName;
import org.apache.nifi.record.path.functions.Format;
import org.apache.nifi.record.path.functions.Hash;
@ -59,6 +60,7 @@ import org.apache.nifi.record.path.functions.ToString;
import org.apache.nifi.record.path.functions.ToUpperCase;
import org.apache.nifi.record.path.functions.TrimString;
import org.apache.nifi.record.path.functions.UUID5;
import org.apache.nifi.record.path.functions.JsonUnescape;
import java.util.ArrayList;
import java.util.List;
@ -308,6 +310,14 @@ public class RecordPathCompiler {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new Base64Decode(args[0], absolute);
}
case "jsonEscape": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new JsonEscape(args[0], absolute);
}
case "jsonUnescape": {
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
return new JsonUnescape(args[0], absolute);
}
case "hash":{
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
return new Hash(args[0], args[1], absolute);

View File

@ -38,6 +38,7 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestRecordPath {
@ -67,6 +69,7 @@ public class TestRecordPath {
// substring is not a filter function so cannot be used as a predicate
try {
RecordPath.compile("/name[substring(., 1, 2)]");
fail("Expected RecordPathException");
} catch (final RecordPathException e) {
// expected
}
@ -1643,6 +1646,135 @@ public class TestRecordPath {
});
}
@Test
public void testJsonEscape() {
final RecordSchema address = new SimpleRecordSchema(Collections.singletonList(
new RecordField("address_1", RecordFieldType.STRING.getDataType())
));
final RecordSchema person = new SimpleRecordSchema(Arrays.asList(
new RecordField("firstName", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType()),
new RecordField("nicknames", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())),
new RecordField("addresses", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(address)))
));
final RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
new RecordField("person", RecordFieldType.RECORD.getRecordDataType(person))
));
final Map<String, Object> values = new HashMap<String, Object>(){{
put("person", new MapRecord(person, new HashMap<String, Object>(){{
put("firstName", "John");
put("age", 30);
put("nicknames", new String[] {"J", "Johnny"});
put("addresses", new MapRecord[]{
new MapRecord(address, Collections.singletonMap("address_1", "123 Somewhere Street")),
new MapRecord(address, Collections.singletonMap("address_1", "456 Anywhere Road"))
});
}}));
}};
final Record record = new MapRecord(schema, values);
assertEquals("\"John\"", RecordPath.compile("jsonEscape(/person/firstName)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
assertEquals("30", RecordPath.compile("jsonEscape(/person/age)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
assertEquals(
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}",
RecordPath.compile("jsonEscape(/person)").evaluate(record).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
);
}
@Test
public void testJsonUnescape() {
final RecordSchema address = new SimpleRecordSchema(Collections.singletonList(
new RecordField("address_1", RecordFieldType.STRING.getDataType())
));
final RecordSchema person = new SimpleRecordSchema(Arrays.asList(
new RecordField("firstName", RecordFieldType.STRING.getDataType()),
new RecordField("age", RecordFieldType.INT.getDataType()),
new RecordField("nicknames", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())),
new RecordField("addresses", RecordFieldType.CHOICE.getChoiceDataType(
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(address)),
RecordFieldType.RECORD.getRecordDataType(address)
))
));
final RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
new RecordField("person", RecordFieldType.RECORD.getRecordDataType(person)),
new RecordField("json_str", RecordFieldType.STRING.getDataType())
));
// test CHOICE resulting in nested ARRAY of RECORDs
final Record recordAddressesArray = new MapRecord(schema,
Collections.singletonMap(
"json_str",
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}")
);
assertEquals(
new HashMap<String, Object>(){{
put("firstName", "John");
put("age", 30);
put("nicknames", Arrays.asList("J", "Johnny"));
put("addresses", Arrays.asList(
Collections.singletonMap("address_1", "123 Somewhere Street"),
Collections.singletonMap("address_1", "456 Anywhere Road")
));
}},
RecordPath.compile("jsonUnescape(/json_str)").evaluate(recordAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
);
// test CHOICE resulting in nested single RECORD
final Record recordAddressesSingle = new MapRecord(schema,
Collections.singletonMap(
"json_str",
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":{\"address_1\":\"123 Somewhere Street\"}}")
);
assertEquals(
new HashMap<String, Object>(){{
put("firstName", "John");
put("age", 30);
put("nicknames", Arrays.asList("J", "Johnny"));
put("addresses", Collections.singletonMap("address_1", "123 Somewhere Street"));
}},
RecordPath.compile("jsonUnescape(/json_str)").evaluate(recordAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
);
// test simple String field
final Record recordJustName = new MapRecord(schema, Collections.singletonMap("json_str", "{\"firstName\":\"John\"}"));
assertEquals(
new HashMap<String, Object>(){{put("firstName", "John");}},
RecordPath.compile("jsonUnescape(/json_str)").evaluate(recordJustName).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
);
// test simple String
final Record recordJustString = new MapRecord(schema, Collections.singletonMap("json_str", "\"John\""));
assertEquals("John", RecordPath.compile("jsonUnescape(/json_str)").evaluate(recordJustString).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
// test simple Int
final Record recordJustInt = new MapRecord(schema, Collections.singletonMap("json_str", "30"));
assertEquals(30, RecordPath.compile("jsonUnescape(/json_str)").evaluate(recordJustInt).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue());
// test invalid JSON
final Record recordInvalidJson = new MapRecord(schema, Collections.singletonMap("json_str", "{\"invalid\": \"json"));
try {
RecordPath.compile("jsonUnescape(/json_str)").evaluate(recordInvalidJson).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue();
fail("Expected a RecordPathException for invalid JSON");
} catch (RecordPathException rpe) {
assertEquals("Unable to deserialise JSON String into Record Path value", rpe.getMessage());
}
// test not String
final Record recordNotString = new MapRecord(schema, Collections.singletonMap("person", new MapRecord(person, Collections.singletonMap("age", 30))));
try {
RecordPath.compile("jsonUnescape(/person/age)").evaluate(recordNotString).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue();
fail("Expected IllegalArgumentException for non-String input");
} catch (IllegalArgumentException iae) {
assertEquals("Argument supplied to jsonUnescape must be a String", iae.getMessage());
}
}
@Test
public void testHash() {
final Record record = getCaseTestRecord();

View File

@ -851,6 +851,96 @@ The following record path expression would decode the String using Base64:
| `base64Decode(/name)` | John
|==========================================================
=== jsonEscape
JSON Stringifies a Record, Array or simple field (e.g. String), using the UTF-8 character set. For example, given a schema such as:
----
{
"type": "record",
"name": "events",
"fields": [{
"name": "person",
"type": "record",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "age", "type": "int" }
]
}]
}
----
and a record such as:
----
{
"person": {
"name" : "John",
"age" : 30
}
}
----
The following record path expression would convert the record into an escaped JSON String:
|==========================================================
| RecordPath | Return value
| `jsonEscape(/person)` | "{\"person\":{\"name\":\"John\",\"age\":30}}"
| `jsonEscape(/person/firstName)` | "\"John\""
| `jsonEscape(/person/age)` | "30"
|==========================================================
=== jsonUnescape
Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set. For example, given a schema such as:
----
{
"type": "record",
"name": "events",
"fields": [{
"name": "person",
"type": "record",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "age", "type": "int" }
]
}]
}
----
and a record such as:
----
{
"json_str": "{\"person\":{\"name\":\"John\",\"age\":30}}"
}
----
The following record path expression would populate the record with unescaped JSON fields:
|==========================================================
| RecordPath | Return value
| `jsonUnescape(/json_str)` | {"person": {"name": "John", "age": 30}}"
|==========================================================
Given a record such as:
----
{
"json_str": "\"John\""
}
----
The following record path expression would return:
|==========================================================
| RecordPath | Return value
| `jsonUnescape(/json_str)` | "John"
|==========================================================
Note that the target schema must be pre-defined if the unescaped JSON is to be set in a Record's fields - Infer Schema will not currently do this automatically.
=== hash
Converts a String using a hash algorithm. For example, given a schema such as: