From 7f8987471d18deececd71bed0cbdee28b9c6254f Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 19 May 2017 16:42:51 -0400 Subject: [PATCH] NIFI-3946: Update LookupService to take a Map instead of a String for the input NIFI-3946: Fixed issues where null values were returned instead of empty optionals This closes #1833. Signed-off-by: Bryan Bende --- .../lookup/script/ScriptedLookupService.java | 14 +- .../script/TestScriptedLookupService.groovy | 6 +- .../groovy/test_lookup_inline.groovy | 11 +- .../processors/standard/LookupRecord.java | 147 +++++++++++++----- .../processors/standard/TestLookupRecord.java | 38 ++++- .../org/apache/nifi/lookup/LookupService.java | 23 ++- .../nifi/lookup/RecordLookupService.java | 11 +- .../nifi/lookup/StringLookupService.java | 9 +- .../lookup/SimpleKeyValueLookupService.java | 23 ++- .../nifi/lookup/maxmind/IPLookupService.java | 85 ++++++---- 10 files changed, 266 insertions(+), 101 deletions(-) diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java index da846ecdb3..ca79ba9b24 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/lookup/script/ScriptedLookupService.java @@ -46,7 +46,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.atomic.AtomicReference; /** @@ -64,9 +66,14 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp private volatile File kerberosServiceKeytab = null; @Override - public Optional lookup(String key) throws LookupFailureException { + public Optional lookup(Map coordinates) throws LookupFailureException { // Delegate the lookup() call to the scripted LookupService - return lookupService.get().lookup(key); + return lookupService.get().lookup(coordinates); + } + + @Override + public Set getRequiredKeys() { + return lookupService.get().getRequiredKeys(); } @Override @@ -177,6 +184,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp } } + @Override @OnEnabled public void onEnabled(final ConfigurationContext context) { synchronized (scriptingComponentHelper.isInitialized) { @@ -236,6 +244,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp } } + @Override public void setup() { // Create a single script engine, the Processor object is reused by each task if (scriptEngine == null) { @@ -263,6 +272,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp * @param scriptBody An input stream associated with the script content * @return Whether the script was successfully reloaded */ + @Override protected boolean reloadScript(final String scriptBody) { // note we are starting here with a fresh listing of validation // results since we are (re)loading a new/updated script. any diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy index 439b37d32e..1dc903f7f3 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/lookup/script/TestScriptedLookupService.groovy @@ -92,13 +92,13 @@ class TestScriptedLookupService { MockFlowFile mockFlowFile = new MockFlowFile(1L) InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) - Optional opt = scriptedLookupService.lookup('Hello') + Optional opt = scriptedLookupService.lookup(['key':'Hello']) assertTrue(opt.present) assertEquals('Hi', opt.get()) - opt = scriptedLookupService.lookup('World') + opt = scriptedLookupService.lookup(['key':'World']) assertTrue(opt.present) assertEquals('there', opt.get()) - opt = scriptedLookupService.lookup('Not There') + opt = scriptedLookupService.lookup(['key':'Not There']) assertFalse(opt.present) } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy index 273ccb97cb..8285b62d4b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_lookup_inline.groovy @@ -15,6 +15,8 @@ * limitations under the License. */ +import java.util.Set + import org.apache.nifi.controller.ControllerServiceInitializationContext import org.apache.nifi.reporting.InitializationException @@ -28,10 +30,15 @@ class GroovyLookupService implements LookupService { @Override - Optional lookup(String key) { + Optional lookup(Map coordinates) { + final String key = coordinates.values().iterator().next(); Optional.ofNullable(lookupTable[key]) } - + + Set getRequiredKeys() { + return java.util.Collections.emptySet(); + } + @Override Class getValueType() { return String diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java index 583ea5181d..4658c9563c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupRecord.java @@ -19,26 +19,31 @@ package org.apache.nifi.processors.standard; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.lookup.LookupService; import org.apache.nifi.processor.ProcessContext; @@ -64,18 +69,19 @@ import org.apache.nifi.util.Tuple; @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") }) @Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"}) -@CapabilityDescription("Extracts a field from a Record and looks up its value in a LookupService. If a result is returned by the LookupService, " +@CapabilityDescription("Extracts one or more fields from a Record and looks up a value for those fields in a LookupService. If a result is returned by the LookupService, " + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then " + "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), " - + "indicating whether or not a result was returned by the LookupService, " - + "allowing the processor to also function as a Routing processor. If any record in the incoming FlowFile has multiple fields match the configured " - + "Lookup RecordPath or if no fields match, then that record will be routed to 'unmatched' (or 'success', depending on the configuration of the 'Routing Strategy' property). " + + "indicating whether or not a result was returned by the LookupService, allowing the processor to also function as a Routing processor. " + + "The \"coordinates\" to use for looking up a value in the Lookup Service are defined by adding a user-defined property. Each property that is added will have an entry added " + + "to a Map, where the name of the property becomes the Map Key and the value returned by the RecordPath becomes the value for that key. If multiple values are returned by the " + + "RecordPath, then the Record will be routed to the 'unmatched' relationship (or 'success', depending on the 'Routing Strategy' property's configuration). " + "If one or more fields match the Result RecordPath, all fields " + "that match will be updated. If there is no match in the configured LookupService, then no fields will be updated. I.e., it will not overwrite an existing value in the Record " + "with a null value. Please note, however, that if the results returned by the LookupService are not accounted for in your schema (specifically, " + "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.") @SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"}) -public class LookupRecord extends AbstractRouteRecord> { +public class LookupRecord extends AbstractRouteRecord, RecordPath>> { private volatile RecordPathCache recordPathCache = new RecordPathCache(25); private volatile LookupService lookupService; @@ -94,15 +100,6 @@ public class LookupRecord extends AbstractRouteRecord properties = new ArrayList<>(); properties.addAll(super.getSupportedPropertyDescriptors()); properties.add(LOOKUP_SERVICE); - properties.add(LOOKUP_RECORD_PATH); properties.add(RESULT_RECORD_PATH); properties.add(ROUTING_STRATEGY); return properties; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("A RecordPath that points to the field whose value will be looked up in the configured Lookup Service") + .addValidator(new RecordPathValidator()) + .expressionLanguageSupported(true) + .required(false) + .dynamic(true) + .build(); + } + + @Override + @SuppressWarnings("unchecked") + protected Collection customValidate(final ValidationContext validationContext) { + final Set dynamicPropNames = validationContext.getProperties().keySet().stream() + .filter(prop -> prop.isDynamic()) + .map(prop -> prop.getName()) + .collect(Collectors.toSet()); + + if (dynamicPropNames.isEmpty()) { + return Collections.singleton(new ValidationResult.Builder() + .subject("User-Defined Properties") + .valid(false) + .explanation("At least one user-defined property must be specified.") + .build()); + } + + final Set requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys(); + final Set missingKeys = requiredKeys.stream() + .filter(key -> !dynamicPropNames.contains(key)) + .collect(Collectors.toSet()); + + if (!missingKeys.isEmpty()) { + final List validationResults = new ArrayList<>(); + for (final String missingKey : missingKeys) { + final ValidationResult result = new ValidationResult.Builder() + .subject(missingKey) + .valid(false) + .explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey + + "'. Please add a new property to this Processor with a name '" + missingKey + + "' and provide a RecordPath that can be used to retrieve the appropriate value.") + .build(); + validationResults.add(result); + } + + return validationResults; + } + + return Collections.emptyList(); + } + @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { if (ROUTING_STRATEGY.equals(descriptor)) { @@ -189,33 +237,43 @@ public class LookupRecord extends AbstractRouteRecord route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, - final Tuple flowFileContext) { + final Tuple, RecordPath> flowFileContext) { - final RecordPathResult lookupPathResult = flowFileContext.getKey().evaluate(record); - final List lookupFieldValues = lookupPathResult.getSelectedFields() - .filter(fieldVal -> fieldVal.getValue() != null) - .collect(Collectors.toList()); + final Map recordPaths = flowFileContext.getKey(); + final Map lookupCoordinates = new HashMap<>(recordPaths.size()); - if (lookupFieldValues.isEmpty()) { - final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; - getLogger().debug("Lookup RecordPath did not match any fields in a record for {}; routing record to " + rels, new Object[] {flowFile}); - return rels; + for (final Map.Entry entry : recordPaths.entrySet()) { + final String coordinateKey = entry.getKey(); + final RecordPath recordPath = entry.getValue(); + + final RecordPathResult pathResult = recordPath.evaluate(record); + final List lookupFieldValues = pathResult.getSelectedFields() + .filter(fieldVal -> fieldVal.getValue() != null) + .collect(Collectors.toList()); + + if (lookupFieldValues.isEmpty()) { + final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels}); + return rels; + } + + if (lookupFieldValues.size() > 1) { + final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; + getLogger().debug("RecordPath for property '{}' matched {} fields in a record for {}; routing record to {}", + new Object[] {coordinateKey, lookupFieldValues.size(), flowFile, rels}); + return rels; + } + + final FieldValue fieldValue = lookupFieldValues.get(0); + final String coordinateValue = DataTypeUtils.toString(fieldValue.getValue(), (String) null); + lookupCoordinates.put(coordinateKey, coordinateValue); } - if (lookupFieldValues.size() > 1) { - final Set rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; - getLogger().debug("Lookup RecordPath matched {} fields in a record for {}; routing record to " + rels, new Object[] {lookupFieldValues.size(), flowFile}); - return rels; - } - - final FieldValue fieldValue = lookupFieldValues.get(0); - final String lookupKey = DataTypeUtils.toString(fieldValue.getValue(), (String) null); - final Optional lookupValue; try { - lookupValue = lookupService.lookup(lookupKey); + lookupValue = lookupService.lookup(lookupCoordinates); } catch (final Exception e) { - throw new ProcessException("Failed to lookup value '" + lookupKey + "' in Lookup Service", e); + throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e); } if (!lookupValue.isPresent()) { @@ -243,9 +301,17 @@ public class LookupRecord extends AbstractRouteRecord getFlowFileContext(final FlowFile flowFile, final ProcessContext context) { - final String lookupPathText = context.getProperty(LOOKUP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); - final RecordPath lookupRecordPath = recordPathCache.getCompiled(lookupPathText); + protected Tuple, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) { + final Map recordPaths = new HashMap<>(); + for (final PropertyDescriptor prop : context.getProperties().keySet()) { + if (!prop.isDynamic()) { + continue; + } + + final String pathText = context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue(); + final RecordPath lookupRecordPath = recordPathCache.getCompiled(pathText); + recordPaths.put(prop.getName(), lookupRecordPath); + } final RecordPath resultRecordPath; if (context.getProperty(RESULT_RECORD_PATH).isSet()) { @@ -255,6 +321,7 @@ public class LookupRecord extends AbstractRouteRecord(lookupRecordPath, resultRecordPath); + return new Tuple<>(recordPaths, resultRecordPath); } + } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java index d19ee43ce3..b84f518fd7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupRecord.java @@ -17,10 +17,13 @@ package org.apache.nifi.processors.standard; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; +import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.reporting.InitializationException; @@ -57,7 +60,7 @@ public class TestLookupRecord { runner.setProperty(LookupRecord.RECORD_READER, "reader"); runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); - runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name"); + runner.setProperty("lookup", "/name"); runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); @@ -145,7 +148,7 @@ public class TestLookupRecord { @Test public void testLookupPathNotFound() throws InitializationException { - runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/other"); + runner.setProperty("lookup", "/other"); runner.enqueue(""); runner.run(); @@ -197,7 +200,7 @@ public class TestLookupRecord { lookupService.addValue("Jane Doe", "Basketball"); lookupService.addValue("Jimmy Doe", "Football"); - runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/*"); + runner.setProperty("lookup", "/*"); runner.enqueue(""); runner.run(); @@ -210,6 +213,19 @@ public class TestLookupRecord { out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n"); } + @Test + public void testInvalidUnlessAllRequiredPropertiesAdded() throws InitializationException { + runner.removeProperty(new PropertyDescriptor.Builder().name("lookup").build()); + runner.setProperty("hello", "/name"); + runner.assertNotValid(); + + runner.setProperty("lookup", "xx"); + runner.assertNotValid(); + + runner.setProperty("lookup", "/name"); + runner.assertValid(); + } + private static class MapLookup extends AbstractControllerService implements StringLookupService { @@ -225,9 +241,23 @@ public class TestLookupRecord { } @Override - public Optional lookup(final String key) { + public Optional lookup(final Map coordinates) { + if (coordinates == null) { + return Optional.empty(); + } + + final String key = coordinates.get("lookup"); + if (key == null) { + return Optional.empty(); + } + return Optional.ofNullable(values.get(key)); } + + @Override + public Set getRequiredKeys() { + return Collections.singleton("lookup"); + } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java index 00258b63c8..48ec173a73 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/LookupService.java @@ -17,25 +17,36 @@ package org.apache.nifi.lookup; +import java.util.Map; import java.util.Optional; +import java.util.Set; import org.apache.nifi.controller.ControllerService; public interface LookupService extends ControllerService { /** - * Looks up a value that corresponds to the given key + * Looks up a value that corresponds to the given map of information, referred to as lookup coordinates * - * @param key the key to lookup - * @return a value that corresponds to the given key + * @param coordinates a Map of key/value pairs that indicate the information that should be looked up + * @return a value that corresponds to the given coordinates * - * @throws LookupFailureException if unable to lookup a value for the given key + * @throws LookupFailureException if unable to lookup a value for the given coordinates */ - Optional lookup(String key) throws LookupFailureException; + Optional lookup(Map coordinates) throws LookupFailureException; /** - * @return the Class that represents the type of value that will be returned by {@link #lookup(String)} + * @return the Class that represents the type of value that will be returned by {@link #lookup(Map)} */ Class getValueType(); + /** + * Many Lookup Services will require a specific set of information be passed in to the {@link #lookup(Map)} method. + * This method will return the Set of keys that must be present in the map that is passed to {@link #lookup(Map)} in order + * for the lookup to succeed. + * + * @return the keys that must be present in the map passed to {@link #lookup(Map)} in order to the lookup to succeed, or an empty set + * if no specific keys are required. + */ + Set getRequiredKeys(); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java index fee28847e6..aefb880b76 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/RecordLookupService.java @@ -17,6 +17,7 @@ package org.apache.nifi.lookup; +import java.util.Map; import java.util.Optional; import org.apache.nifi.serialization.record.Record; @@ -24,15 +25,15 @@ import org.apache.nifi.serialization.record.Record; public interface RecordLookupService extends LookupService { /** - * Returns an Optional Record that corresponds to the given key + * Returns an Optional Record that corresponds to the given coordinates * - * @param key the key to lookup - * @return an Optional Record that corresponds to the given key + * @param coordinates the coordinates to lookup + * @return an Optional Record that corresponds to the given coordinates * - * @throws LookupFailureException if unable to lookup a value for the given key + * @throws LookupFailureException if unable to lookup a value for the given coordinates */ @Override - Optional lookup(String key) throws LookupFailureException; + Optional lookup(Map coordinates) throws LookupFailureException; @Override default Class getValueType() { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java index be7d7c86ac..aa2721bdb2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java @@ -17,18 +17,19 @@ package org.apache.nifi.lookup; +import java.util.Map; import java.util.Optional; public interface StringLookupService extends LookupService { /** - * Returns an Optional value that corresponds to the given key + * Returns an Optional value that corresponds to the given coordinates * - * @param key the key to lookup - * @return an Optional String that represents the value for the given key + * @param coordinates the coordinates to lookup + * @return an Optional String that represents the value for the given coordinates */ @Override - Optional lookup(String key); + Optional lookup(Map coordinates); @Override default Class getValueType() { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java index 59f7ca67b2..4ed75b2150 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java @@ -20,7 +20,9 @@ package org.apache.nifi.lookup; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -31,8 +33,11 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @Tags({"lookup", "enrich", "key", "value"}) -@CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name.") +@CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name. " + + "The coordinates that are passed to the lookup must contain the key 'key'.") public class SimpleKeyValueLookupService extends AbstractControllerService implements StringLookupService { + private static final String KEY = "key"; + private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); private volatile Map lookupValues = new HashMap<>(); @Override @@ -52,7 +57,21 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple } @Override - public Optional lookup(final String key) { + public Optional lookup(final Map coordinates) { + if (coordinates == null) { + return Optional.empty(); + } + + final String key = coordinates.get(KEY); + if (key == null) { + return Optional.empty(); + } + return Optional.ofNullable(lookupValues.get(key)); } + + @Override + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java index 58ee4de450..096f1a68f6 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/maxmind/IPLookupService.java @@ -17,16 +17,23 @@ package org.apache.nifi.lookup.maxmind; -import com.maxmind.db.InvalidDatabaseException; -import com.maxmind.geoip2.model.AnonymousIpResponse; -import com.maxmind.geoip2.model.CityResponse; -import com.maxmind.geoip2.model.ConnectionTypeResponse; -import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; -import com.maxmind.geoip2.model.DomainResponse; -import com.maxmind.geoip2.model.IspResponse; -import com.maxmind.geoip2.record.Country; -import com.maxmind.geoip2.record.Location; -import com.maxmind.geoip2.record.Subdivision; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.commons.codec.digest.DigestUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -42,28 +49,30 @@ import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.util.StopWatch; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import com.maxmind.db.InvalidDatabaseException; +import com.maxmind.geoip2.model.AnonymousIpResponse; +import com.maxmind.geoip2.model.CityResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse; +import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; +import com.maxmind.geoip2.model.DomainResponse; +import com.maxmind.geoip2.model.IspResponse; +import com.maxmind.geoip2.record.Country; +import com.maxmind.geoip2.record.Location; +import com.maxmind.geoip2.record.Subdivision; + @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"}) @CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind " - + "Database file and specifying which types of enrichment should be provided for an IP Address. Each type of enrichment is a separate lookup, so configuring the " - + "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component " + + "Database file and specifying which types of enrichment should be provided for an IP Address or Hostname. Each type of enrichment is a separate lookup, so configuring the " + + "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. In order to use this service, a lookup " + + "must be performed using key of 'ip' and a value that is a valid IP address or hostname. View the Usage of this component " + "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.") public class IPLookupService extends AbstractControllerService implements RecordLookupService { private volatile String databaseFile = null; + private static final String IP_KEY = "ip"; + private static final Set REQUIRED_KEYS = Stream.of(IP_KEY).collect(Collectors.toSet()); + private volatile DatabaseReader databaseReader = null; private volatile String databaseChecksum = null; private volatile long databaseLastRefreshAttempt = -1; @@ -175,8 +184,13 @@ public class IPLookupService extends AbstractControllerService implements Record } @Override - public Optional lookup(final String key) throws LookupFailureException { - if (key == null) { + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } + + @Override + public Optional lookup(final Map coordinates) throws LookupFailureException { + if (coordinates == null) { return Optional.empty(); } @@ -193,7 +207,7 @@ public class IPLookupService extends AbstractControllerService implements Record // InvalidDatabaseException, so force a reload and then retry the lookup one time, if we still get an error then throw it try { final DatabaseReader databaseReader = this.databaseReader; - return doLookup(databaseReader, key); + return doLookup(databaseReader, coordinates); } catch (InvalidDatabaseException idbe) { if (dbWriteLock.tryLock()) { try { @@ -210,7 +224,7 @@ public class IPLookupService extends AbstractControllerService implements Record getLogger().debug("Attempting to retry lookup after InvalidDatabaseException"); try { final DatabaseReader databaseReader = this.databaseReader; - return doLookup(databaseReader, key); + return doLookup(databaseReader, coordinates); } catch (final Exception e) { throw new LookupFailureException("Error performing look up: " + e.getMessage(), e); } @@ -218,18 +232,23 @@ public class IPLookupService extends AbstractControllerService implements Record dbWriteLock.unlock(); } } else { - throw new LookupFailureException("Failed to lookup the key " + key + " due to " + idbe.getMessage(), idbe); + throw new LookupFailureException("Failed to lookup a value for " + coordinates + " due to " + idbe.getMessage(), idbe); } } } - private Optional doLookup(final DatabaseReader databaseReader, final String key) throws LookupFailureException, InvalidDatabaseException { + private Optional doLookup(final DatabaseReader databaseReader, final Map coordinates) throws LookupFailureException, InvalidDatabaseException { + final String ipAddress = coordinates.get(IP_KEY); + if (ipAddress == null) { + return Optional.empty(); + } + final InetAddress inetAddress; try { - inetAddress = InetAddress.getByName(key); + inetAddress = InetAddress.getByName(ipAddress); } catch (final IOException ioe) { getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " + - "providing the service with an invalid IP address", new Object[] {key}, ioe); + "providing the service with an invalid IP address", new Object[] {coordinates}, ioe); return Optional.empty(); }