NIFI-9677 Fixed issue that an empty JSON array causes flow file to be considered unmatched even though it should be considered as a match.

Refactored to avoid streaming over the list twice.

Tweaked unit test, so it works in NiFi 1.x also.

Signed-off-by: Matt Burgess <mattyb149@apache.org>
This commit is contained in:
jsteinebrey 2024-01-11 15:21:14 -06:00 committed by Matt Burgess
parent c441f90cfc
commit 4c1edbefa3
7 changed files with 164 additions and 2 deletions

View File

@ -81,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -489,9 +490,19 @@ public class LookupRecord extends AbstractProcessor {
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record);
AtomicLong selectedFieldsCount = new AtomicLong(0);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
.filter(fieldVal -> {
selectedFieldsCount.incrementAndGet();
return fieldVal.getValue() != null;
})
.collect(Collectors.toList());
if (selectedFieldsCount.get() == 0) {
// When selectedFieldsCount == 0; then an empty array was found which counts as a match.
// Since the array is empty, no further processing is needed, so continue to next recordPath.
continue;
}
if (lookupFieldValues.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;

View File

@ -527,6 +527,100 @@ public class TestLookupRecord {
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath());
}
@Test
public void testLookupEmptyArray() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookupForInPlaceReplacement();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupLanguage", "/locales[*]/language");
runner.setProperty("lookupRegion", "/locales[*]/region");
runner.setProperty("lookupCurrency", "/currencies[*]/currency");
runner.setProperty("lookupFoo", "/foo/foo");
runner.setProperty("lookupBar", "/bar");
lookupService.addValue("CA", "Canada");
lookupService.addValue("CAD", "Canadian dollar");
lookupService.addValue("en", "English");
lookupService.addValue("EUR", "Euro");
lookupService.addValue("ja", "Japanese");
lookupService.addValue("JP", "Japan");
lookupService.addValue("JPY", "Japanese yen");
lookupService.addValue("US", "United States");
lookupService.addValue("USD", "United States Dollar");
lookupService.addValue("fr", "French");
lookupService.addValue("FR", "France");
lookupService.addValue("original", "updated");
lookupService.addValue("orgValue", "newValue");
lookupService.addValue("orgValue2", "newValue2");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input-empty-array.json").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-empty-array.json").toPath());
}
@Test
public void testLookupMissingJsonField() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookupForInPlaceReplacement();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.setProperty(jsonWriter, JsonRecordSetWriter.SUPPRESS_NULLS, JsonRecordSetWriter.ALWAYS_SUPPRESS);
runner.enableControllerService(jsonReader);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupFoo", "/foo/foo");
lookupService.addValue("original", "updated");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input-missing.json").toPath());
runner.run();
runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile outUnmatched = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
outUnmatched.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-missing-unmatched.json").toPath());
runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);
final MockFlowFile outMatched = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
outMatched.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output-missing-matched.json").toPath());
}
@Test
public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);

View File

@ -0,0 +1,40 @@
[
{
"bar" : "orgValue",
"foo" : {
"foo" : "original"
},
"locales": [
{
"language": "fr",
"region": "FR"
}, {
"language": "en",
"region": "US"
}
],
"currencies": []
}, {
"bar" : "orgValue2",
"foo" : {
"foo" : "original"
},
"locales": [
{
"language": "fr",
"region": "CA"
},
{
"language": "ja",
"region": "JP"
}
],
"currencies": [
{
"currency": "CAD"
}, {
"currency": "JPY"
}
]
}
]

View File

@ -0,0 +1,14 @@
[
{
"foo" : {
"foo" : "original"
},
"unmentioned" : {
"foo" : "original"
}
}, {
"unmentioned" : {
"foo" : "original"
}
}
]

View File

@ -0,0 +1 @@
[{"bar":"newValue","foo":{"foo":"updated"},"locales":[{"language":"French","region":"France"},{"language":"English","region":"United States"}],"currencies":[]},{"bar":"newValue2","foo":{"foo":"updated"},"locales":[{"language":"French","region":"Canada"},{"language":"Japanese","region":"Japan"}],"currencies":[{"currency":"Canadian dollar"},{"currency":"Japanese yen"}]}]

View File

@ -0,0 +1 @@
[{"foo":{"foo":"updated"},"unmentioned":{"foo":"original"}}]