From 578430c9d9d52cabb0d60dd4a21634475ba2db9e Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Tue, 25 Feb 2020 08:53:36 -0800 Subject: [PATCH] NIFI-7197 - In-place replacement in LookupRecord processor This closes #4088 Signed-off-by: Mark Payne --- .../nifi-standard-processors/pom.xml | 2 + .../processors/standard/LookupRecord.java | 124 ++++++++-- .../additionalDetails.html | 215 ++++++++++++++++++ .../processors/standard/TestLookupRecord.java | 89 ++++++++ .../TestLookupRecord/lookup-array-input.json | 29 +++ .../TestLookupRecord/lookup-array-output.json | 1 + 6 files changed, 445 insertions(+), 15 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 568b4d50fc..1e5c3386eb 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -583,6 +583,8 @@ src/test/resources/TestValidateRecord/nested-map-schema.avsc src/test/resources/TestValidateRecord/timestamp.avsc src/test/resources/TestValidateRecord/timestamp.json + src/test/resources/TestLookupRecord/lookup-array-input.json + src/test/resources/TestLookupRecord/lookup-array-output.json diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 23d132536b..28705cc27b 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -105,6 +105,14 @@ public class LookupRecord extends AbstractRouteRecord requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys(); - final Set missingKeys = requiredKeys.stream() - .filter(key -> !dynamicPropNames.contains(key)) - .collect(Collectors.toSet()); - if (!missingKeys.isEmpty()) { - final List validationResults = new ArrayList<>(); - for (final String missingKey : missingKeys) { - final ValidationResult result = new ValidationResult.Builder() - .subject(missingKey) - .valid(false) - .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey - + "'. Please add a new property to this Processor with a name '" + missingKey - + "' and provide a RecordPath that can be used to retrieve the appropriate value.") - .build(); - validationResults.add(result); + if(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue())) { + // it must be a single key lookup service + if(requiredKeys.size() != 1) { + return Collections.singleton(new ValidationResult.Builder() + .subject(LOOKUP_SERVICE.getDisplayName()) + .valid(false) + .explanation("When using \"" + REPLACE_EXISTING_VALUES.getDisplayName() + "\" as Record Update Strategy, " + + "only a Lookup Service requiring a single key can be used.") + .build()); } + } else { + final Set missingKeys = requiredKeys.stream() + .filter(key -> !dynamicPropNames.contains(key)) + .collect(Collectors.toSet()); - return validationResults; + if (!missingKeys.isEmpty()) { + final List validationResults = new ArrayList<>(); + for (final String missingKey : missingKeys) { + final ValidationResult result = new ValidationResult.Builder() + .subject(missingKey) + .valid(false) + .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey + + "'. Please add a new property to this Processor with a name '" + missingKey + + "' and provide a RecordPath that can be used to retrieve the appropriate value.") + .build(); + validationResults.add(result); + } + + return validationResults; + } } return Collections.emptyList(); @@ -263,6 +295,68 @@ public class LookupRecord extends AbstractRouteRecord route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, final Tuple, RecordPath> flowFileContext) { + final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue()); + + if(isInPlaceReplacement) { + return doInPlaceReplacement(record, flowFile, context, flowFileContext); + } else { + return doResultPathReplacement(record, flowFile, context, flowFileContext); + } + + } + + private Set doInPlaceReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple, RecordPath> flowFileContext) { + + final String lookupKey = (String) context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys().iterator().next(); + + final Map recordPaths = flowFileContext.getKey(); + final Map lookupCoordinates = new HashMap<>(recordPaths.size()); + + for (final Map.Entry entry : recordPaths.entrySet()) { + final String coordinateKey = entry.getKey(); + final RecordPath recordPath = entry.getValue(); + + final RecordPathResult pathResult = recordPath.evaluate(record); + final List lookupFieldValues = pathResult.getSelectedFields() + .filter(fieldVal -> fieldVal.getValue() != null) + .collect(Collectors.toList()); + + if (lookupFieldValues.isEmpty()) { + final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels}); + return rels; + } + + for (FieldValue fieldValue : lookupFieldValues) { + final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean) + ? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null); + lookupCoordinates.put(lookupKey, coordinateValue); + + final Optional lookupValueOption; + try { + lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes()); + } catch (final Exception e) { + throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); + } + + if (!lookupValueOption.isPresent()) { + final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + return rels; + } + + final Object lookupValue = lookupValueOption.get(); + + final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType()); + fieldValue.updateValue(lookupValue, inferredDataType); + + } + } + + final Set rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION; + return rels; + } + + private Set doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple, RecordPath> flowFileContext) { final Map recordPaths = flowFileContext.getKey(); final Map lookupCoordinates = new HashMap<>(recordPaths.size()); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html new file mode 100644 index 0000000000..df837080e2 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.LookupRecord/additionalDetails.html @@ -0,0 +1,215 @@ + + + + + + LookupRecord + + + + + +

+ LookupRecord makes use of the NiFi + RecordPath Domain-Specific Language (DSL) to allow the user to indicate which field(s), + depending on the Record Update Strategy, in the Record should be updated. The Record will + be updated using the value returned by the provided Lookup Service. +

+ +

Record Update Strategy - Use Property

+ +

+ In this case, the user should add, to the Processor's configuration, as much User-defined + Properties as required by the Lookup Service to form the lookup coordinates. The name of + the properties should match the names expected by the Lookup Service. +

+ +

+ The field evaluated using the path configured in the "Result RecordPath" property will be + the field updated with the value returned by the Lookup Service. +

+ +

+ Let's assume a Simple Key Value Lookup Service containing the following key/value pairs: +

+ + +
+FR => France
+CA => Canada
+
+
+ +

+ Let's assume the following JSON with three records as input: +

+ + +
+[
+	{
+		"country": null,
+		"code": "FR"
+	}, {
+		"country": null,
+		"code": "CA"
+	}, {
+		"country": null,
+		"code": "JP"
+	}
+]
+
+
+ +

+ The processor is configured with "Use Property" as "Record Update Strategy", the "Result + RecordPath" is configured with "/country" and a user-defined property is added with the + name "key" (as required by this Lookup Service) and the value "/code". +

+ +

+ When triggered, the processor will look for the value associated to the "/code" path and + will use the value as the "key" of the Lookup Service. The value returned by the Lookup + Service will be used to update the value corresponding to "/country". With the above + examples, it will produce: +

+ + +
+[
+	{
+		"country": "France",
+		"code": "FR"
+	}, {
+		"country": "Canada",
+		"code": "CA"
+	}, {
+		"country": null,
+		"code": "JP"
+	}
+]
+
+
+ +

Record Update Strategy - Replace Existing Values

+ +

+ With this strategy, the "Result RecordPath" property will be ignored and the configured Lookup + Service must be a single single key lookup service. For each user-defined property, the value + contained in the field corresponding to the record path will be used as the key in the Lookup + Service and will be replaced by the value returned by the Lookup Service. It is possible to + configure multiple dynamic properties to update multiple fields in one execution. This strategy + only supports simple types replacements (strings, integers, etc). +

+ +

+ Since this strategy allows in-place replacement, it is possible to use Record Paths for fields + contained in arrays. +

+ +

+ Let's assume a Simple Key Value Lookup Service containing the following key/value pairs: +

+ + +
+FR => France
+CA => Canada
+fr => French
+en => English
+
+
+ +

+ Let's assume the following JSON with two records as input: +

+ + +
+[
+	{
+		"locales": [
+			{
+				"region": "FR",
+				"language": "fr"
+			}, {
+				"region": "US",
+				"language": "en"
+			}
+		]
+	}, {
+		"locales": [
+			{
+				"region": "CA",
+				"language": "fr"
+			}, 
+			{
+				"region": "JP",
+				"language": "ja"
+			}
+		]
+	}
+]
+
+
+ +

+ The processor is configured with "Replace Existing Values" as "Record Update Strategy", + two user-defined properties are added: "region" => "/locales[*]/region" and "language + => "/locales[*]/language".. +

+ +

+ When triggered, the processor will loop over the user-defined properties. First, it'll + search for the fields corresponding to "/locales[*]/region", for each value from the + record, the value will be used as the key with the Lookup Service and the value will + be replaced by the result returned by the Lookup Service. Example: the first region is + "FR" and this key is associated to the value "France" in the Lookup Service, so the + value "FR" is replaced by "France" in the record. With the above examples, it will + produce: +

+ + +
+[
+	{
+		"locales": [
+			{
+				"region": "France",
+				"language": "French"
+			}, {
+				"region": "US",
+				"language": "English"
+			}
+		]
+	}, {
+		"locales": [
+			{
+				"region": "Canada",
+				"language": "French"
+			}, 
+			{
+				"region": "JP",
+				"language": "ja"
+			}
+		]
+	}
+]
+
+
+ + + \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index f8fb158004..86bba8a813 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -19,9 +19,13 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.lookup.RecordLookupService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -37,6 +41,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -436,6 +442,88 @@ public class TestLookupRecord { out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n"); } + @Test + public void testLookupArray() throws InitializationException, IOException { + TestRunner runner = TestRunners.newTestRunner(LookupRecord.class); + final MapLookup lookupService = new MapLookup(); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + runner.addControllerService("writer", jsonWriter); + runner.enableControllerService(jsonWriter); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS); + runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES); + runner.setProperty(LookupRecord.RECORD_READER, "reader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); + runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); + runner.setProperty("lookupLanguage", "/locales[*]/language"); + runner.setProperty("lookupRegion", "/locales[*]/region"); + runner.setProperty("lookupFoo", "/foo/foo"); + + lookupService.addValue("FR", "France"); + lookupService.addValue("CA", "Canada"); + lookupService.addValue("fr", "French"); + lookupService.addValue("key", "value"); + + runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath()); + runner.run(); + + runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS); + final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0); + out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath()); + } + + @Test + public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException { + TestRunner runner = TestRunners.newTestRunner(LookupRecord.class); + final MapLookup lookupService = new MapLookup(); + + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + + runner.addControllerService("reader", jsonReader); + runner.enableControllerService(jsonReader); + runner.addControllerService("writer", jsonWriter); + runner.enableControllerService(jsonWriter); + runner.addControllerService("lookup", lookupService); + runner.enableControllerService(lookupService); + + runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); + runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES); + runner.setProperty(LookupRecord.RECORD_READER, "reader"); + runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); + runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); + runner.setProperty("lookupLanguage", "/locales[*]/language"); + runner.setProperty("lookupRegion", "/locales[*]/region"); + runner.setProperty("lookupFoo", "/foo/foo"); + + lookupService.addValue("FR", "France"); + lookupService.addValue("CA", "Canada"); + lookupService.addValue("fr", "French"); + lookupService.addValue("badkey", "value"); + + runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath()); + runner.run(); + + runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED); + } + private static class MapLookup extends AbstractControllerService implements StringLookupService { private final Map values = new HashMap<>(); private Map expectedContext; @@ -449,6 +537,7 @@ public class TestLookupRecord { return String.class; } + @Override public Optional lookup(final Map coordinates, Map context) { validateContext(context); return lookup(coordinates); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json new file mode 100644 index 0000000000..f2902cd1f7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-input.json @@ -0,0 +1,29 @@ +[ + { + "foo" : { + "foo" : "key" + }, + "locales": [ + { + "language" : "fr", + "region" : "CA" + }, { + "language" : "fr", + "region" : "FR" + } + ] + }, { + "foo" : { + "foo" : "key" + }, + "locales": [ + { + "language" : "fr", + "region" : "CA" + }, { + "language" : "fr", + "region" : "FR" + } + ] + } +] diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json new file mode 100644 index 0000000000..10169f8ed1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestLookupRecord/lookup-array-output.json @@ -0,0 +1 @@ +[{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]},{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]}] \ No newline at end of file