mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 10:38:33 +00:00
NIFI-10865 allow RecordPath's unescapeJson to convert de-serialised JSON Objects into Records
NIFI-10865 allow UpdateRecord to replace the Record root for relative paths, e.g. when a RecordPath function is used to modify selected field(s) This closes #6708 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
parent
d399c5267f
commit
e2067c4ea1
@ -23,7 +23,10 @@ import org.apache.nifi.record.path.RecordPathEvaluationContext;
|
||||
import org.apache.nifi.record.path.StandardFieldValue;
|
||||
import org.apache.nifi.record.path.exception.RecordPathException;
|
||||
import org.apache.nifi.record.path.paths.RecordPathSegment;
|
||||
import org.apache.nifi.record.path.util.RecordPathUtils;
|
||||
import org.apache.nifi.serialization.record.DataType;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.type.ArrayDataType;
|
||||
import org.apache.nifi.serialization.record.type.ChoiceDataType;
|
||||
import org.apache.nifi.serialization.record.type.RecordDataType;
|
||||
@ -31,21 +34,29 @@ import org.apache.nifi.serialization.record.util.DataTypeUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class UnescapeJson extends RecordPathSegment {
|
||||
private final RecordPathSegment recordPath;
|
||||
|
||||
private final RecordPathSegment convertToRecordRecordPath;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
public UnescapeJson(final RecordPathSegment recordPath, final boolean absolute) {
|
||||
public UnescapeJson(final RecordPathSegment recordPath, final RecordPathSegment convertToRecordRecordPath, final boolean absolute) {
|
||||
super("unescapeJson", null, absolute);
|
||||
this.recordPath = recordPath;
|
||||
this.convertToRecordRecordPath = convertToRecordRecordPath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) {
|
||||
final boolean convertMapToRecord = convertToRecordRecordPath != null
|
||||
&& Boolean.parseBoolean(RecordPathUtils.getFirstStringValue(convertToRecordRecordPath, context));
|
||||
|
||||
final Stream<FieldValue> fieldValues = recordPath.evaluate(context);
|
||||
return fieldValues.filter(fv -> fv.getValue() != null)
|
||||
.map(fv -> {
|
||||
@ -58,7 +69,10 @@ public class UnescapeJson extends RecordPathSegment {
|
||||
dataType = DataTypeUtils.chooseDataType(value, (ChoiceDataType) fv.getField().getDataType());
|
||||
}
|
||||
|
||||
return new StandardFieldValue(convertFieldValue(value, fv.getField().getFieldName(), dataType), fv.getField(), fv.getParent().orElse(null));
|
||||
return new StandardFieldValue(
|
||||
convertFieldValue(value, fv.getField().getFieldName(), dataType, convertMapToRecord),
|
||||
fv.getField(), fv.getParent().orElse(null)
|
||||
);
|
||||
} catch (IOException e) {
|
||||
throw new RecordPathException("Unable to deserialise JSON String into Record Path value", e);
|
||||
}
|
||||
@ -69,7 +83,7 @@ public class UnescapeJson extends RecordPathSegment {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType) throws IOException {
|
||||
private Object convertFieldValue(final Object value, final String fieldName, final DataType dataType, final boolean convertMapToRecord) throws IOException {
|
||||
if (dataType instanceof RecordDataType) {
|
||||
// convert Maps to Records
|
||||
final Map<String, Object> map = objectMapper.readValue(value.toString(), Map.class);
|
||||
@ -85,7 +99,20 @@ public class UnescapeJson extends RecordPathSegment {
|
||||
return arr;
|
||||
} else {
|
||||
// generic conversion for simpler fields
|
||||
return objectMapper.readValue(value.toString(), Object.class);
|
||||
final Object parsed = objectMapper.readValue(value.toString(), Object.class);
|
||||
if (convertMapToRecord) {
|
||||
if (DataTypeUtils.isCompatibleDataType(parsed, RecordFieldType.RECORD.getDataType())) {
|
||||
return DataTypeUtils.toRecord(parsed, fieldName);
|
||||
} else if (DataTypeUtils.isArrayTypeCompatible(parsed, RecordFieldType.RECORD.getDataType())) {
|
||||
return Arrays.stream((Object[]) parsed).map(m -> DataTypeUtils.toRecord(m, fieldName)).toArray(Record[]::new);
|
||||
} else if (parsed instanceof Collection
|
||||
&& !((Collection<Object>) parsed).isEmpty()
|
||||
&& DataTypeUtils.isCompatibleDataType(((Collection<Object>) parsed).stream().findFirst().get(), RecordFieldType.RECORD.getDataType())) {
|
||||
return ((Collection<Object>) parsed).stream().map(m -> DataTypeUtils.toRecord(m, fieldName)).collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
return parsed;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -329,8 +329,15 @@ public class RecordPathCompiler {
|
||||
return new EscapeJson(args[0], absolute);
|
||||
}
|
||||
case "unescapeJson": {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||
return new UnescapeJson(args[0], absolute);
|
||||
final int numArgs = argumentListTree.getChildCount();
|
||||
|
||||
if (numArgs == 1) {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 1, functionName, absolute);
|
||||
return new UnescapeJson(args[0], null, absolute);
|
||||
} else {
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
|
||||
return new UnescapeJson(args[0], args[1], absolute);
|
||||
}
|
||||
}
|
||||
case "hash":{
|
||||
final RecordPathSegment[] args = getArgPaths(argumentListTree, 2, functionName, absolute);
|
||||
|
@ -60,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
@SuppressWarnings("OptionalGetWithoutIsPresent")
|
||||
public class TestRecordPath {
|
||||
|
||||
private static final String USER_TIMEZONE_PROPERTY = "user.timezone";
|
||||
@ -280,7 +281,7 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final FieldValue fieldValue = RecordPath.compile("/attributes['city']").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertTrue(fieldValue.getField().getFieldName().equals("attributes"));
|
||||
assertEquals("attributes", fieldValue.getField().getFieldName());
|
||||
assertEquals("New York", fieldValue.getValue());
|
||||
assertEquals(record, fieldValue.getParentRecord().get());
|
||||
}
|
||||
@ -300,7 +301,7 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
final FieldValue fieldValue = RecordPath.compile("/attributes/.['city']").evaluate(record).getSelectedFields().findFirst().get();
|
||||
assertTrue(fieldValue.getField().getFieldName().equals("attributes"));
|
||||
assertEquals("attributes", fieldValue.getField().getFieldName());
|
||||
assertEquals("New York", fieldValue.getValue());
|
||||
assertEquals(record, fieldValue.getParentRecord().get());
|
||||
}
|
||||
@ -1094,21 +1095,24 @@ public class TestRecordPath {
|
||||
|
||||
// Special character cases
|
||||
values.put("name", "John Doe");
|
||||
assertEquals(
|
||||
"John\nDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\n')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals("John\nDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\n')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing whitespace to new line");
|
||||
|
||||
values.put("name", "John\nDoe");
|
||||
assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, '\\n', ' ')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing new line to whitespace");
|
||||
|
||||
values.put("name", "John Doe");
|
||||
assertEquals("John\tDoe", RecordPath.compile("replaceRegex(/name, '[\\s]', '\\t')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing whitespace to tab");
|
||||
|
||||
values.put("name", "John\tDoe");
|
||||
assertEquals("John Doe", RecordPath.compile("replaceRegex(/name, '\\t', ' ')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing tab to whitespace");
|
||||
|
||||
}
|
||||
|
||||
@ -1126,23 +1130,27 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
// Quotes
|
||||
// NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't to do so at NiFi UI.
|
||||
// NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't do so at NiFi UI.
|
||||
// The test record path is equivalent to replaceRegex(/name, '\'', '"')
|
||||
values.put("name", "'John' 'Doe'");
|
||||
assertEquals("\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, '\\'', '\"')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing quote to double-quote");
|
||||
|
||||
values.put("name", "\"John\" \"Doe\"");
|
||||
assertEquals("'John' 'Doe'", RecordPath.compile("replaceRegex(/name, '\"', '\\'')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing double-quote to single-quote");
|
||||
|
||||
values.put("name", "'John' 'Doe'");
|
||||
assertEquals("\"John\" \"Doe\"", RecordPath.compile("replaceRegex(/name, \"'\", \"\\\"\")")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing quote to double-quote, the function arguments are wrapped by double-quote");
|
||||
|
||||
values.put("name", "\"John\" \"Doe\"");
|
||||
assertEquals("'John' 'Doe'", RecordPath.compile("replaceRegex(/name, \"\\\"\", \"'\")")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing double-quote to single-quote, the function arguments are wrapped by double-quote");
|
||||
|
||||
}
|
||||
|
||||
@ -1160,15 +1168,17 @@ public class TestRecordPath {
|
||||
final Record record = new MapRecord(schema, values);
|
||||
|
||||
// Back-slash
|
||||
// NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't to do so at NiFi UI.
|
||||
// NOTE: At Java code, a single back-slash needs to be escaped with another-back slash, but needn't do so at NiFi UI.
|
||||
// The test record path is equivalent to replaceRegex(/name, '\\', '/')
|
||||
values.put("name", "John\\Doe");
|
||||
assertEquals("John/Doe", RecordPath.compile("replaceRegex(/name, '\\\\', '/')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing a back-slash to forward-slash");
|
||||
|
||||
values.put("name", "John/Doe");
|
||||
assertEquals("John\\Doe", RecordPath.compile("replaceRegex(/name, '/', '\\\\')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Replacing a forward-slash to back-slash");
|
||||
|
||||
}
|
||||
|
||||
@ -1188,11 +1198,13 @@ public class TestRecordPath {
|
||||
// Brackets
|
||||
values.put("name", "J[o]hn Do[e]");
|
||||
assertEquals("J(o)hn Do(e)", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\[', '('), '\\]', ')')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Square brackets can be escaped with back-slash");
|
||||
|
||||
values.put("name", "J(o)hn Do(e)");
|
||||
assertEquals("J[o]hn Do[e]", RecordPath.compile("replaceRegex(replaceRegex(/name, '\\(', '['), '\\)', ']')")
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
.evaluate(record).getSelectedFields().findFirst().get().getValue(),
|
||||
"Brackets can be escaped with back-slash");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1629,8 +1641,8 @@ public class TestRecordPath {
|
||||
RecordPath.compile("base64Encode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals(Base64.getEncoder().encodeToString("Doe".getBytes(StandardCharsets.UTF_8)),
|
||||
RecordPath.compile("base64Encode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertTrue(Arrays.equals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)),
|
||||
(byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
|
||||
assertArrayEquals(Base64.getEncoder().encode("xyz".getBytes(StandardCharsets.UTF_8)),
|
||||
(byte[]) RecordPath.compile("base64Encode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
List<Object> actualValues = RecordPath.compile("base64Encode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
|
||||
IntStream.range(0, 3).forEach(i -> {
|
||||
Object expectedObject = expectedValues.get(i);
|
||||
@ -1638,7 +1650,7 @@ public class TestRecordPath {
|
||||
if (actualObject instanceof String) {
|
||||
assertEquals(expectedObject, actualObject);
|
||||
} else if (actualObject instanceof byte[]) {
|
||||
assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject));
|
||||
assertArrayEquals((byte[]) expectedObject, (byte[]) actualObject);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -1660,7 +1672,7 @@ public class TestRecordPath {
|
||||
|
||||
assertEquals("John", RecordPath.compile("base64Decode(/firstName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertEquals("Doe", RecordPath.compile("base64Decode(/lastName)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
assertTrue(Arrays.equals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue()));
|
||||
assertArrayEquals("xyz".getBytes(StandardCharsets.UTF_8), (byte[]) RecordPath.compile("base64Decode(/b)").evaluate(record).getSelectedFields().findFirst().get().getValue());
|
||||
List<Object> actualValues = RecordPath.compile("base64Decode(/*)").evaluate(record).getSelectedFields().map(FieldValue::getValue).collect(Collectors.toList());
|
||||
IntStream.range(0, 3).forEach(i -> {
|
||||
Object expectedObject = expectedValues.get(i);
|
||||
@ -1668,7 +1680,7 @@ public class TestRecordPath {
|
||||
if (actualObject instanceof String) {
|
||||
assertEquals(expectedObject, actualObject);
|
||||
} else if (actualObject instanceof byte[]) {
|
||||
assertTrue(Arrays.equals((byte[]) expectedObject, (byte[]) actualObject));
|
||||
assertArrayEquals((byte[]) expectedObject, (byte[]) actualObject);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -1733,8 +1745,8 @@ public class TestRecordPath {
|
||||
new RecordField("json_str", RecordFieldType.STRING.getDataType())
|
||||
));
|
||||
|
||||
// test CHOICE resulting in nested ARRAY of RECORDs
|
||||
final Record recordAddressesArray = new MapRecord(schema,
|
||||
// test CHOICE resulting in nested ARRAY of Records
|
||||
final Record mapAddressesArray = new MapRecord(schema,
|
||||
Collections.singletonMap(
|
||||
"json_str",
|
||||
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]}")
|
||||
@ -1749,11 +1761,11 @@ public class TestRecordPath {
|
||||
Collections.singletonMap("address_1", "456 Anywhere Road")
|
||||
));
|
||||
}},
|
||||
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
|
||||
RecordPath.compile("unescapeJson(/json_str)").evaluate(mapAddressesArray).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
|
||||
);
|
||||
|
||||
// test CHOICE resulting in nested single RECORD
|
||||
final Record recordAddressesSingle = new MapRecord(schema,
|
||||
final Record mapAddressesSingle = new MapRecord(schema,
|
||||
Collections.singletonMap(
|
||||
"json_str",
|
||||
"{\"firstName\":\"John\",\"age\":30,\"nicknames\":[\"J\",\"Johnny\"],\"addresses\":{\"address_1\":\"123 Somewhere Street\"}}")
|
||||
@ -1765,7 +1777,35 @@ public class TestRecordPath {
|
||||
put("nicknames", Arrays.asList("J", "Johnny"));
|
||||
put("addresses", Collections.singletonMap("address_1", "123 Somewhere Street"));
|
||||
}},
|
||||
RecordPath.compile("unescapeJson(/json_str)").evaluate(recordAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
|
||||
RecordPath.compile("unescapeJson(/json_str, 'false')").evaluate(mapAddressesSingle).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
|
||||
);
|
||||
|
||||
// test single Record converted from Map Object
|
||||
final Record recordFromMap = new MapRecord(schema,
|
||||
Collections.singletonMap(
|
||||
"json_str",
|
||||
"{\"firstName\":\"John\",\"age\":30}")
|
||||
);
|
||||
assertEquals(
|
||||
DataTypeUtils.toRecord(new HashMap<String, Object>(){{
|
||||
put("firstName", "John");
|
||||
put("age", 30);
|
||||
}}, "json_str"),
|
||||
RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordFromMap).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
|
||||
);
|
||||
|
||||
// test collection of Record converted from Map collection
|
||||
final Record recordCollectionFromMaps = new MapRecord(schema,
|
||||
Collections.singletonMap(
|
||||
"json_str",
|
||||
"[{\"address_1\":\"123 Somewhere Street\"},{\"address_1\":\"456 Anywhere Road\"}]")
|
||||
);
|
||||
assertEquals(
|
||||
Arrays.asList(
|
||||
DataTypeUtils.toRecord(Collections.singletonMap("address_1", "123 Somewhere Street"), "json_str"),
|
||||
DataTypeUtils.toRecord(Collections.singletonMap("address_1", "456 Anywhere Road"), "json_str")
|
||||
),
|
||||
RecordPath.compile("unescapeJson(/json_str, 'true')").evaluate(recordCollectionFromMaps).getSelectedFields().findFirst().orElseThrow(IllegalStateException::new).getValue()
|
||||
);
|
||||
|
||||
// test simple String field
|
||||
|
@ -893,7 +893,10 @@ The following record path expression would convert the record into an escaped JS
|
||||
|
||||
=== unescapeJson
|
||||
|
||||
Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set. For example, given a schema such as:
|
||||
Converts a stringified JSON element to a Record, Array or simple field (e.g. String), using the UTF-8 character set.
|
||||
Optionally convert JSON Objects parsed as Maps into Records (defaults to false).
|
||||
|
||||
For example, given a schema such as:
|
||||
|
||||
----
|
||||
{
|
||||
@ -927,6 +930,23 @@ The following record path expression would populate the record with unescaped JS
|
||||
|
||||
Given a record such as:
|
||||
|
||||
----
|
||||
{
|
||||
"json_str": "{\"name\":\"John\",\"age\":30}"
|
||||
}
|
||||
----
|
||||
|
||||
The following record path expression would return:
|
||||
|
||||
|==========================================================
|
||||
| RecordPath | Return value
|
||||
| `unescapeJson(/json_str, 'true')` | {"name": "John", "age": 30} (as a Record)
|
||||
| `unescapeJson(/json_str, 'false')` | {"name"="John", "age"=30} (as a Map)
|
||||
| `unescapeJson(/json_str)` | {"name"="John", "age"=30} (as a Map)
|
||||
|==========================================================
|
||||
|
||||
Given a record such as:
|
||||
|
||||
----
|
||||
{
|
||||
"json_str": "\"John\""
|
||||
|
@ -661,23 +661,29 @@
|
||||
<exclude>src/test/resources/TestExtractGrok/apache.log</exclude>
|
||||
<exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude>
|
||||
<exclude>src/test/resources/TestExtractGrok/patterns</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/embedded-string.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/person-address.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/person-stringified-name.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/person-with-null-array.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/input/multi-arrays.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/embedded-string.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-null-array.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-capital-lastname.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/name-fields-only.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/name-and-mother-same.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-name.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/person-with-new-city.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/embedded-record.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-address.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-and-mother.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/schema/multi-arrays.avsc</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and1.json</exclude>
|
||||
<exclude>src/test/resources/TestUpdateRecord/output/updateArrays/multi-arrays-0and2.json</exclude>
|
||||
|
@ -201,32 +201,40 @@ public class UpdateRecord extends AbstractRecordProcessor {
|
||||
}
|
||||
|
||||
private Record processAbsolutePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record) {
|
||||
final RecordPathResult replacementResult = replacementRecordPath.evaluate(record);
|
||||
final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
|
||||
final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, null, record);
|
||||
final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList());
|
||||
|
||||
return updateRecord(destinationFieldValues, selectedFields, record);
|
||||
}
|
||||
|
||||
private boolean isReplacingRoot(final List<FieldValue> destinationFields) {
|
||||
return destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent();
|
||||
}
|
||||
|
||||
private Record processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, Record record) {
|
||||
final List<FieldValue> destinationFieldValues = destinationFields.collect(Collectors.toList());
|
||||
|
||||
for (final FieldValue fieldVal : destinationFieldValues) {
|
||||
final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldVal);
|
||||
final List<FieldValue> selectedFields = replacementResult.getSelectedFields().collect(Collectors.toList());
|
||||
final Object replacementObject = getReplacementObject(selectedFields);
|
||||
updateFieldValue(fieldVal, replacementObject);
|
||||
if (isReplacingRoot(destinationFieldValues)) {
|
||||
final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, destinationFieldValues.get(0), record);
|
||||
record = updateRecord(destinationFieldValues, selectedFields, record);
|
||||
} else {
|
||||
for (final FieldValue fieldVal : destinationFieldValues) {
|
||||
final List<FieldValue> selectedFields = getSelectedFields(replacementRecordPath, fieldVal, record);
|
||||
final Object replacementObject = getReplacementObject(selectedFields);
|
||||
updateFieldValue(fieldVal, replacementObject);
|
||||
}
|
||||
}
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
private Record updateRecord(final List<FieldValue> destinationFields, final List<FieldValue> selectedFields, final Record record) {
|
||||
if (destinationFields.size() == 1 && !destinationFields.get(0).getParentRecord().isPresent()) {
|
||||
if (isReplacingRoot(destinationFields)) {
|
||||
final Object replacement = getReplacementObject(selectedFields);
|
||||
if (replacement == null) {
|
||||
return record;
|
||||
}
|
||||
|
||||
if (replacement instanceof Record) {
|
||||
return (Record) replacement;
|
||||
}
|
||||
@ -262,6 +270,11 @@ public class UpdateRecord extends AbstractRecordProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
private List<FieldValue> getSelectedFields(final RecordPath replacementRecordPath, final FieldValue fieldValue, final Record record) {
|
||||
final RecordPathResult replacementResult = replacementRecordPath.evaluate(record, fieldValue);
|
||||
return replacementResult.getSelectedFields().collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Object getReplacementObject(final List<FieldValue> selectedFields) {
|
||||
if (selectedFields.size() > 1) {
|
||||
final List<RecordField> fields = selectedFields.stream().map(FieldValue::getField).collect(Collectors.toList());
|
||||
|
@ -438,6 +438,94 @@ public class TestUpdateRecord {
|
||||
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetRootWithUnescapeJsonCall() throws InitializationException, IOException {
|
||||
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", jsonReader);
|
||||
|
||||
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc")));
|
||||
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/name-fields-only.avsc")));
|
||||
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
|
||||
runner.enableControllerService(jsonReader);
|
||||
|
||||
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", jsonWriter);
|
||||
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
||||
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
|
||||
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(jsonWriter);
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person-stringified-name.json"));
|
||||
runner.setProperty("/", "unescapeJson(/stringified_name, 'true')");
|
||||
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
|
||||
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/name-fields-only.json")));
|
||||
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetFieldWithUnescapeJsonCall() throws InitializationException, IOException {
|
||||
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", jsonReader);
|
||||
|
||||
final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-stringified-name.avsc")));
|
||||
final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc")));
|
||||
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
|
||||
runner.enableControllerService(jsonReader);
|
||||
|
||||
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", jsonWriter);
|
||||
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
|
||||
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
|
||||
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(jsonWriter);
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person-stringified-name.json"));
|
||||
runner.setProperty("/name", "unescapeJson(/stringified_name)");
|
||||
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
|
||||
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-name.json")));
|
||||
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetNestedRecordWithUnescapeJsonCall() throws InitializationException, IOException {
|
||||
final JsonTreeReader jsonReader = new JsonTreeReader();
|
||||
runner.addControllerService("reader", jsonReader);
|
||||
|
||||
final String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/embedded-record.avsc")));
|
||||
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, schemaText);
|
||||
runner.enableControllerService(jsonReader);
|
||||
|
||||
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
|
||||
runner.addControllerService("writer", jsonWriter);
|
||||
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
|
||||
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, schemaText);
|
||||
runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
|
||||
runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
|
||||
runner.enableControllerService(jsonWriter);
|
||||
|
||||
runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/embedded-string.json"));
|
||||
runner.setProperty("/embedded", "unescapeJson(/str)");
|
||||
runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES);
|
||||
|
||||
runner.run();
|
||||
runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
|
||||
final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/embedded-record.json")));
|
||||
runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSetRootPathRelativeWithMultipleValues() throws InitializationException, IOException {
|
||||
|
@ -0,0 +1,4 @@
|
||||
{
|
||||
"str": "{\"label\":\"Test!\",\"child\":{\"name\":\"Child record!\"}}",
|
||||
"embedded": null
|
||||
}
|
@ -0,0 +1,4 @@
|
||||
{
|
||||
"id": 485,
|
||||
"stringified_name": "{\"last\": \"Doe\", \"first\": \"John\"}"
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
[ {
|
||||
"str" : "{\"label\":\"Test!\",\"child\":{\"name\":\"Child record!\"}}",
|
||||
"embedded" : {
|
||||
"label" : "Test!",
|
||||
"child" : {
|
||||
"name" : "Child record!"
|
||||
}
|
||||
}
|
||||
} ]
|
@ -0,0 +1,7 @@
|
||||
[ {
|
||||
"id" : 485,
|
||||
"name" : {
|
||||
"last" : "Doe",
|
||||
"first" : "John"
|
||||
}
|
||||
} ]
|
@ -0,0 +1,39 @@
|
||||
{
|
||||
"name": "Parent",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "str",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "embedded",
|
||||
"type": [
|
||||
{
|
||||
"name": "EmbeddedRecord",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "label",
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"name": "child",
|
||||
"type": {
|
||||
"name": "ChildRecord",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{
|
||||
"name": "name",
|
||||
"type": "string"
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"null"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
@ -0,0 +1,9 @@
|
||||
{
|
||||
"name": "personWithNameRecord",
|
||||
"namespace": "nifi",
|
||||
"type": "record",
|
||||
"fields": [
|
||||
{ "name": "id", "type": "int" },
|
||||
{ "name": "stringified_name", "type": "string" }
|
||||
]
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user