NIFI-7197 - In-place replacement in LookupRecord processor

This closes #4088

Signed-off-by: Mark Payne <markap14@hotmail.com>
This commit is contained in:
Pierre Villard 2020-02-25 08:53:36 -08:00 committed by Mark Payne
parent f4b65afb64
commit 578430c9d9
6 changed files with 445 additions and 15 deletions

View File

@ -583,6 +583,8 @@
<exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
<exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
<exclude>src/test/resources/TestLookupRecord/lookup-array-input.json</exclude>
<exclude>src/test/resources/TestLookupRecord/lookup-array-output.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -105,6 +105,14 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
static final AllowableValue RESULT_RECORD_FIELDS = new AllowableValue("record-fields", "Insert Record Fields",
"All of the fields in the Record that is retrieved from the Lookup Service will be inserted into the destination path.");
static final AllowableValue USE_PROPERTY = new AllowableValue("use-property", "Use Property",
"The \"Result RecordPath\" property will be used to determine which part of the record should be updated with the value returned by the Lookup Service");
static final AllowableValue REPLACE_EXISTING_VALUES = new AllowableValue("replace-existing-values", "Replace Existing Values",
"The \"Result RecordPath\" property will be ignored and the lookup service must be a single simple key lookup service. Every dynamic property value should "
+ "be a record path. For each dynamic property, the value contained in the field corresponding to the record path will be used as the key in the Lookup "
+ "Service and the value returned by the Lookup Service will be used to replace the existing value. It is possible to configure multiple dynamic properties "
+ "to replace multiple values in one execution. This strategy only supports simple types replacements (strings, integers, etc).");
static final PropertyDescriptor LOOKUP_SERVICE = new PropertyDescriptor.Builder()
.name("lookup-service")
.displayName("Lookup Service")
@ -144,6 +152,16 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
.required(true)
.build();
static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("record-update-strategy")
.displayName("Record Update Strategy")
.description("This property defines the strategy to use when updating the record with the value returned by the Lookup Service.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(REPLACE_EXISTING_VALUES, USE_PROPERTY)
.defaultValue(USE_PROPERTY.getValue())
.required(true)
.build();
static final Relationship REL_MATCHED = new Relationship.Builder()
.name("matched")
.description("All records for which the lookup returns a value will be routed to this relationship")
@ -182,6 +200,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
properties.add(RESULT_RECORD_PATH);
properties.add(ROUTING_STRATEGY);
properties.add(RESULT_CONTENTS);
properties.add(REPLACEMENT_STRATEGY);
return properties;
}
@ -214,6 +233,18 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
}
final Set<String> requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
if(validationContext.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue())) {
// it must be a single key lookup service
if(requiredKeys.size() != 1) {
return Collections.singleton(new ValidationResult.Builder()
.subject(LOOKUP_SERVICE.getDisplayName())
.valid(false)
.explanation("When using \"" + REPLACE_EXISTING_VALUES.getDisplayName() + "\" as Record Update Strategy, "
+ "only a Lookup Service requiring a single key can be used.")
.build());
}
} else {
final Set<String> missingKeys = requiredKeys.stream()
.filter(key -> !dynamicPropNames.contains(key))
.collect(Collectors.toSet());
@ -233,6 +264,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
return validationResults;
}
}
return Collections.emptyList();
}
@ -263,6 +295,68 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPa
protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
final boolean isInPlaceReplacement = context.getProperty(REPLACEMENT_STRATEGY).getValue().equals(REPLACE_EXISTING_VALUES.getValue());
if(isInPlaceReplacement) {
return doInPlaceReplacement(record, flowFile, context, flowFileContext);
} else {
return doResultPathReplacement(record, flowFile, context, flowFileContext);
}
}
private Set<Relationship> doInPlaceReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
final String lookupKey = (String) context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys().iterator().next();
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final String coordinateKey = entry.getKey();
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
return rels;
}
for (FieldValue fieldValue : lookupFieldValues) {
final Object coordinateValue = (fieldValue.getValue() instanceof Number || fieldValue.getValue() instanceof Boolean)
? fieldValue.getValue() : DataTypeUtils.toString(fieldValue.getValue(), (String) null);
lookupCoordinates.put(lookupKey, coordinateValue);
final Optional<?> lookupValueOption;
try {
lookupValueOption = lookupService.lookup(lookupCoordinates, flowFile.getAttributes());
} catch (final Exception e) {
throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
}
if (!lookupValueOption.isPresent()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels;
}
final Object lookupValue = lookupValueOption.get();
final DataType inferredDataType = DataTypeUtils.inferDataType(lookupValue, RecordFieldType.STRING.getDataType());
fieldValue.updateValue(lookupValue, inferredDataType);
}
}
final Set<Relationship> rels = routeToMatchedUnmatched ? MATCHED_COLLECTION : SUCCESS_COLLECTION;
return rels;
}
private Set<Relationship> doResultPathReplacement(Record record, FlowFile flowFile, ProcessContext context, Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final Map<String, Object> lookupCoordinates = new HashMap<>(recordPaths.size());

View File

@ -0,0 +1,215 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>LookupRecord</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<p>
LookupRecord makes use of the NiFi <a href="../../../../../html/record-path-guide.html">
RecordPath Domain-Specific Language (DSL)</a> to allow the user to indicate which field(s),
depending on the Record Update Strategy, in the Record should be updated. The Record will
be updated using the value returned by the provided Lookup Service.
</p>
<h3>Record Update Strategy - Use Property</h3>
<p>
In this case, the user should add, to the Processor's configuration, as much User-defined
Properties as required by the Lookup Service to form the lookup coordinates. The name of
the properties should match the names expected by the Lookup Service.
</p>
<p>
The field evaluated using the path configured in the "Result RecordPath" property will be
the field updated with the value returned by the Lookup Service.
</p>
<p>
Let's assume a Simple Key Value Lookup Service containing the following key/value pairs:
</p>
<code>
<pre>
FR => France
CA => Canada
</pre>
</code>
<p>
Let's assume the following JSON with three records as input:
</p>
<code>
<pre>
[
{
"country": null,
"code": "FR"
}, {
"country": null,
"code": "CA"
}, {
"country": null,
"code": "JP"
}
]
</pre>
</code>
<p>
The processor is configured with "Use Property" as "Record Update Strategy", the "Result
RecordPath" is configured with "/country" and a user-defined property is added with the
name "key" (as required by this Lookup Service) and the value "/code".
</p>
<p>
When triggered, the processor will look for the value associated to the "/code" path and
will use the value as the "key" of the Lookup Service. The value returned by the Lookup
Service will be used to update the value corresponding to "/country". With the above
examples, it will produce:
</p>
<code>
<pre>
[
{
"country": "France",
"code": "FR"
}, {
"country": "Canada",
"code": "CA"
}, {
"country": null,
"code": "JP"
}
]
</pre>
</code>
<h3>Record Update Strategy - Replace Existing Values</h3>
<p>
With this strategy, the "Result RecordPath" property will be ignored and the configured Lookup
Service must be a single single key lookup service. For each user-defined property, the value
contained in the field corresponding to the record path will be used as the key in the Lookup
Service and will be replaced by the value returned by the Lookup Service. It is possible to
configure multiple dynamic properties to update multiple fields in one execution. This strategy
only supports simple types replacements (strings, integers, etc).
</p>
<p>
Since this strategy allows in-place replacement, it is possible to use Record Paths for fields
contained in arrays.
</p>
<p>
Let's assume a Simple Key Value Lookup Service containing the following key/value pairs:
</p>
<code>
<pre>
FR => France
CA => Canada
fr => French
en => English
</pre>
</code>
<p>
Let's assume the following JSON with two records as input:
</p>
<code>
<pre>
[
{
"locales": [
{
"region": "FR",
"language": "fr"
}, {
"region": "US",
"language": "en"
}
]
}, {
"locales": [
{
"region": "CA",
"language": "fr"
},
{
"region": "JP",
"language": "ja"
}
]
}
]
</pre>
</code>
<p>
The processor is configured with "Replace Existing Values" as "Record Update Strategy",
two user-defined properties are added: "region" => "/locales[*]/region" and "language
=> "/locales[*]/language"..
</p>
<p>
When triggered, the processor will loop over the user-defined properties. First, it'll
search for the fields corresponding to "/locales[*]/region", for each value from the
record, the value will be used as the key with the Lookup Service and the value will
be replaced by the result returned by the Lookup Service. Example: the first region is
"FR" and this key is associated to the value "France" in the Lookup Service, so the
value "FR" is replaced by "France" in the record. With the above examples, it will
produce:
</p>
<code>
<pre>
[
{
"locales": [
{
"region": "France",
"language": "French"
}, {
"region": "US",
"language": "English"
}
]
}, {
"locales": [
{
"region": "Canada",
"language": "French"
},
{
"region": "JP",
"language": "ja"
}
]
}
]
</pre>
</code>
</body>
</html>

View File

@ -19,9 +19,13 @@ package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
@ -37,6 +41,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -436,6 +442,88 @@ public class TestLookupRecord {
out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
}
@Test
public void testLookupArray() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookup();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupLanguage", "/locales[*]/language");
runner.setProperty("lookupRegion", "/locales[*]/region");
runner.setProperty("lookupFoo", "/foo/foo");
lookupService.addValue("FR", "France");
lookupService.addValue("CA", "Canada");
lookupService.addValue("fr", "French");
lookupService.addValue("key", "value");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath());
}
@Test
public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookup();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupLanguage", "/locales[*]/language");
runner.setProperty("lookupRegion", "/locales[*]/region");
runner.setProperty("lookupFoo", "/foo/foo");
lookupService.addValue("FR", "France");
lookupService.addValue("CA", "Canada");
lookupService.addValue("fr", "French");
lookupService.addValue("badkey", "value");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED);
}
private static class MapLookup extends AbstractControllerService implements StringLookupService {
private final Map<String, String> values = new HashMap<>();
private Map<String, Object> expectedContext;
@ -449,6 +537,7 @@ public class TestLookupRecord {
return String.class;
}
@Override
public Optional<String> lookup(final Map<String, Object> coordinates, Map<String, String> context) {
validateContext(context);
return lookup(coordinates);

View File

@ -0,0 +1,29 @@
[
{
"foo" : {
"foo" : "key"
},
"locales": [
{
"language" : "fr",
"region" : "CA"
}, {
"language" : "fr",
"region" : "FR"
}
]
}, {
"foo" : {
"foo" : "key"
},
"locales": [
{
"language" : "fr",
"region" : "CA"
}, {
"language" : "fr",
"region" : "FR"
}
]
}
]

View File

@ -0,0 +1 @@
[{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]},{"foo":{"foo":"value"},"locales":[{"language":"French","region":"Canada"},{"language":"French","region":"France"}]}]