NIFI-9544 - LookupRecord - fixed behavior when no matching value in the LRS

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5638.
This commit is contained in:
Pierre Villard 2022-01-06 18:58:57 +01:00 committed by Joe Gresock
parent dd7131b257
commit 4847926a4b
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
5 changed files with 44 additions and 7 deletions

View File

@ -624,7 +624,9 @@
<exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
<exclude>src/test/resources/TestLookupRecord/lookup-array-input.json</exclude>
<exclude>src/test/resources/TestLookupRecord/lookup-array-input-unmatched.json</exclude>
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
<exclude>src/test/resources/TestLookupRecord/lookup-array-output-unmatched.json</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-schema.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/int-maps-data.json</exclude>
<exclude>src/test/resources/TestValidateRecord/array-and-map-with-null-element.avro</exclude>

View File

@ -309,6 +309,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
final String coordinateKey = lookupService.getRequiredKeys().iterator().next();
boolean hasUnmatchedValue = false;
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final RecordPath recordPath = entry.getValue();
@ -338,20 +339,22 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
}
if (!lookupValueOption.isPresent()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels;
hasUnmatchedValue = true;
continue;
}
final Object lookupValue = lookupValueOption.get();
final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
fieldValue.updateValue(lookupValue, inferredDataType);
}
}
final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels;
if (hasUnmatchedValue) {
return routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
} else {
return routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
}
}
private Set<Relationship> doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {

View File

@ -539,14 +539,16 @@ public class TestLookupRecord {
runner.setProperty("lookupFoo", "/foo/foo");
lookupService.addValue("FR", "France");
lookupService.addValue("CA", "Canada");
lookupService.addValue("fr", "French");
lookupService.addValue("badkey", "value");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input-unmatched.json").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
System.out.println(out.getContent());
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-unmatched.json").toPath());
}
private static class MapLookup extends AbstractControllerService implements StringLookupService {

View File

@ -0,0 +1,29 @@
[
{
"foo" : {
"foo" : "key"
},
"locales": [
{
"language" : "en",
"region" : "CA"
}, {
"language" : "fr",
"region" : "FR"
}
]
}, {
"foo" : {
"foo" : "key"
},
"locales": [
{
"language" : "fr",
"region" : "CA"
}, {
"language" : "fr",
"region" : "FR"
}
]
}
]

View File

@ -0,0 +1 @@
[{"foo":{"foo":"key"},"locales":[{"language":"en","region":"CA"},{"language":"French","region":"France"}]},{"foo":{"foo":"key"},"locales":[{"language":"French","region":"CA"},{"language":"French","region":"France"}]}]