NIFI-3946: Update LookupService to take a Map instead of a String for the input

NIFI-3946: Fixed issues where null values were returned instead of empty optionals

This closes #1833.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Mark Payne 2017-05-19 16:42:51 -04:00 committed by Bryan Bende
parent 71cd497fef
commit 7f8987471d
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
10 changed files with 266 additions and 101 deletions

View File

@ -46,7 +46,9 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
@ -64,9 +66,14 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
private volatile File kerberosServiceKeytab = null; private volatile File kerberosServiceKeytab = null;
@Override @Override
public Optional<Object> lookup(String key) throws LookupFailureException { public Optional<Object> lookup(Map<String, String> coordinates) throws LookupFailureException {
// Delegate the lookup() call to the scripted LookupService // Delegate the lookup() call to the scripted LookupService
return lookupService.get().lookup(key); return lookupService.get().lookup(coordinates);
}
@Override
public Set<String> getRequiredKeys() {
return lookupService.get().getRequiredKeys();
} }
@Override @Override
@ -177,6 +184,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
} }
} }
@Override
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) { public void onEnabled(final ConfigurationContext context) {
synchronized (scriptingComponentHelper.isInitialized) { synchronized (scriptingComponentHelper.isInitialized) {
@ -236,6 +244,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
} }
} }
@Override
public void setup() { public void setup() {
// Create a single script engine, the Processor object is reused by each task // Create a single script engine, the Processor object is reused by each task
if (scriptEngine == null) { if (scriptEngine == null) {
@ -263,6 +272,7 @@ public class ScriptedLookupService extends AbstractScriptedControllerService imp
* @param scriptBody An input stream associated with the script content * @param scriptBody An input stream associated with the script content
* @return Whether the script was successfully reloaded * @return Whether the script was successfully reloaded
*/ */
@Override
protected boolean reloadScript(final String scriptBody) { protected boolean reloadScript(final String scriptBody) {
// note we are starting here with a fresh listing of validation // note we are starting here with a fresh listing of validation
// results since we are (re)loading a new/updated script. any // results since we are (re)loading a new/updated script. any

View File

@ -92,13 +92,13 @@ class TestScriptedLookupService {
MockFlowFile mockFlowFile = new MockFlowFile(1L) MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
Optional opt = scriptedLookupService.lookup('Hello') Optional opt = scriptedLookupService.lookup(['key':'Hello'])
assertTrue(opt.present) assertTrue(opt.present)
assertEquals('Hi', opt.get()) assertEquals('Hi', opt.get())
opt = scriptedLookupService.lookup('World') opt = scriptedLookupService.lookup(['key':'World'])
assertTrue(opt.present) assertTrue(opt.present)
assertEquals('there', opt.get()) assertEquals('there', opt.get())
opt = scriptedLookupService.lookup('Not There') opt = scriptedLookupService.lookup(['key':'Not There'])
assertFalse(opt.present) assertFalse(opt.present)
} }

View File

@ -15,6 +15,8 @@
* limitations under the License. * limitations under the License.
*/ */
import java.util.Set
import org.apache.nifi.controller.ControllerServiceInitializationContext import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException import org.apache.nifi.reporting.InitializationException
@ -28,10 +30,15 @@ class GroovyLookupService implements LookupService<String> {
@Override @Override
Optional<String> lookup(String key) { Optional<String> lookup(Map<String, String> coordinates) {
final String key = coordinates.values().iterator().next();
Optional.ofNullable(lookupTable[key]) Optional.ofNullable(lookupTable[key])
} }
Set<String> getRequiredKeys() {
return java.util.Collections.emptySet();
}
@Override @Override
Class<?> getValueType() { Class<?> getValueType() {
return String return String

View File

@ -19,26 +19,31 @@ package org.apache.nifi.processors.standard;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.lookup.LookupService; import org.apache.nifi.lookup.LookupService;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -64,18 +69,19 @@ import org.apache.nifi.util.Tuple;
@WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile")
}) })
@Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"}) @Tags({"lookup", "enrichment", "route", "record", "csv", "json", "avro", "logs", "convert", "filter"})
@CapabilityDescription("Extracts a field from a Record and looks up its value in a LookupService. If a result is returned by the LookupService, " @CapabilityDescription("Extracts one or more fields from a Record and looks up a value for those fields in a LookupService. If a result is returned by the LookupService, "
+ "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then " + "that result is optionally added to the Record. In this case, the processor functions as an Enrichment processor. Regardless, the Record is then "
+ "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), " + "routed to either the 'matched' relationship or 'unmatched' relationship (if the 'Routing Strategy' property is configured to do so), "
+ "indicating whether or not a result was returned by the LookupService, " + "indicating whether or not a result was returned by the LookupService, allowing the processor to also function as a Routing processor. "
+ "allowing the processor to also function as a Routing processor. If any record in the incoming FlowFile has multiple fields match the configured " + "The \"coordinates\" to use for looking up a value in the Lookup Service are defined by adding a user-defined property. Each property that is added will have an entry added "
+ "Lookup RecordPath or if no fields match, then that record will be routed to 'unmatched' (or 'success', depending on the configuration of the 'Routing Strategy' property). " + "to a Map, where the name of the property becomes the Map Key and the value returned by the RecordPath becomes the value for that key. If multiple values are returned by the "
+ "RecordPath, then the Record will be routed to the 'unmatched' relationship (or 'success', depending on the 'Routing Strategy' property's configuration). "
+ "If one or more fields match the Result RecordPath, all fields " + "If one or more fields match the Result RecordPath, all fields "
+ "that match will be updated. If there is no match in the configured LookupService, then no fields will be updated. I.e., it will not overwrite an existing value in the Record " + "that match will be updated. If there is no match in the configured LookupService, then no fields will be updated. I.e., it will not overwrite an existing value in the Record "
+ "with a null value. Please note, however, that if the results returned by the LookupService are not accounted for in your schema (specifically, " + "with a null value. Please note, however, that if the results returned by the LookupService are not accounted for in your schema (specifically, "
+ "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.") + "the schema that is configured for your Record Writer) then the fields will not be written out to the FlowFile.")
@SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"}) @SeeAlso(value = {ConvertRecord.class, SplitRecord.class}, classNames = {"org.apache.nifi.lookup.SimpleKeyValueLookupService", "org.apache.nifi.lookup.maxmind.IPLookupService"})
public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPath>> { public class LookupRecord extends AbstractRouteRecord<Tuple<Map<String, RecordPath>, RecordPath>> {
private volatile RecordPathCache recordPathCache = new RecordPathCache(25); private volatile RecordPathCache recordPathCache = new RecordPathCache(25);
private volatile LookupService<?> lookupService; private volatile LookupService<?> lookupService;
@ -94,15 +100,6 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
.required(true) .required(true)
.build(); .build();
static final PropertyDescriptor LOOKUP_RECORD_PATH = new PropertyDescriptor.Builder()
.name("lookup-record-path")
.displayName("Lookup RecordPath")
.description("A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(true)
.required(true)
.build();
static final PropertyDescriptor RESULT_RECORD_PATH = new PropertyDescriptor.Builder() static final PropertyDescriptor RESULT_RECORD_PATH = new PropertyDescriptor.Builder()
.name("result-record-path") .name("result-record-path")
.displayName("Result RecordPath") .displayName("Result RecordPath")
@ -159,12 +156,63 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
final List<PropertyDescriptor> properties = new ArrayList<>(); final List<PropertyDescriptor> properties = new ArrayList<>();
properties.addAll(super.getSupportedPropertyDescriptors()); properties.addAll(super.getSupportedPropertyDescriptors());
properties.add(LOOKUP_SERVICE); properties.add(LOOKUP_SERVICE);
properties.add(LOOKUP_RECORD_PATH);
properties.add(RESULT_RECORD_PATH); properties.add(RESULT_RECORD_PATH);
properties.add(ROUTING_STRATEGY); properties.add(ROUTING_STRATEGY);
return properties; return properties;
} }
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("A RecordPath that points to the field whose value will be looked up in the configured Lookup Service")
.addValidator(new RecordPathValidator())
.expressionLanguageSupported(true)
.required(false)
.dynamic(true)
.build();
}
@Override
@SuppressWarnings("unchecked")
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Set<String> dynamicPropNames = validationContext.getProperties().keySet().stream()
.filter(prop -> prop.isDynamic())
.map(prop -> prop.getName())
.collect(Collectors.toSet());
if (dynamicPropNames.isEmpty()) {
return Collections.singleton(new ValidationResult.Builder()
.subject("User-Defined Properties")
.valid(false)
.explanation("At least one user-defined property must be specified.")
.build());
}
final Set<String> requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
final Set<String> missingKeys = requiredKeys.stream()
.filter(key -> !dynamicPropNames.contains(key))
.collect(Collectors.toSet());
if (!missingKeys.isEmpty()) {
final List<ValidationResult> validationResults = new ArrayList<>();
for (final String missingKey : missingKeys) {
final ValidationResult result = new ValidationResult.Builder()
.subject(missingKey)
.valid(false)
.explanation("The configured Lookup Services requires that a key be provided with the name '" + missingKey
+ "'. Please add a new property to this Processor with a name '" + missingKey
+ "' and provide a RecordPath that can be used to retrieve the appropriate value.")
.build();
validationResults.add(result);
}
return validationResults;
}
return Collections.emptyList();
}
@Override @Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (ROUTING_STRATEGY.equals(descriptor)) { if (ROUTING_STRATEGY.equals(descriptor)) {
@ -189,33 +237,43 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
@Override @Override
protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context, protected Set<Relationship> route(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context,
final Tuple<RecordPath, RecordPath> flowFileContext) { final Tuple<Map<String, RecordPath>, RecordPath> flowFileContext) {
final RecordPathResult lookupPathResult = flowFileContext.getKey().evaluate(record); final Map<String, RecordPath> recordPaths = flowFileContext.getKey();
final List<FieldValue> lookupFieldValues = lookupPathResult.getSelectedFields() final Map<String, String> lookupCoordinates = new HashMap<>(recordPaths.size());
for (final Map.Entry<String, RecordPath> entry : recordPaths.entrySet()) {
final String coordinateKey = entry.getKey();
final RecordPath recordPath = entry.getValue();
final RecordPathResult pathResult = recordPath.evaluate(record);
final List<FieldValue> lookupFieldValues = pathResult.getSelectedFields()
.filter(fieldVal -> fieldVal.getValue() != null) .filter(fieldVal -> fieldVal.getValue() != null)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (lookupFieldValues.isEmpty()) { if (lookupFieldValues.isEmpty()) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("Lookup RecordPath did not match any fields in a record for {}; routing record to " + rels, new Object[] {flowFile}); getLogger().debug("RecordPath for property '{}' did not match any fields in a record for {}; routing record to {}", new Object[] {coordinateKey, flowFile, rels});
return rels; return rels;
} }
if (lookupFieldValues.size() > 1) { if (lookupFieldValues.size() > 1) {
final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION; final Set<Relationship> rels = routeToMatchedUnmatched ? UNMATCHED_COLLECTION : SUCCESS_COLLECTION;
getLogger().debug("Lookup RecordPath matched {} fields in a record for {}; routing record to " + rels, new Object[] {lookupFieldValues.size(), flowFile}); getLogger().debug("RecordPath for property '{}' matched {} fields in a record for {}; routing record to {}",
new Object[] {coordinateKey, lookupFieldValues.size(), flowFile, rels});
return rels; return rels;
} }
final FieldValue fieldValue = lookupFieldValues.get(0); final FieldValue fieldValue = lookupFieldValues.get(0);
final String lookupKey = DataTypeUtils.toString(fieldValue.getValue(), (String) null); final String coordinateValue = DataTypeUtils.toString(fieldValue.getValue(), (String) null);
lookupCoordinates.put(coordinateKey, coordinateValue);
}
final Optional<?> lookupValue; final Optional<?> lookupValue;
try { try {
lookupValue = lookupService.lookup(lookupKey); lookupValue = lookupService.lookup(lookupCoordinates);
} catch (final Exception e) { } catch (final Exception e) {
throw new ProcessException("Failed to lookup value '" + lookupKey + "' in Lookup Service", e); throw new ProcessException("Failed to lookup coordinates " + lookupCoordinates + " in Lookup Service", e);
} }
if (!lookupValue.isPresent()) { if (!lookupValue.isPresent()) {
@ -243,9 +301,17 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
} }
@Override @Override
protected Tuple<RecordPath, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) { protected Tuple<Map<String, RecordPath>, RecordPath> getFlowFileContext(final FlowFile flowFile, final ProcessContext context) {
final String lookupPathText = context.getProperty(LOOKUP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue(); final Map<String, RecordPath> recordPaths = new HashMap<>();
final RecordPath lookupRecordPath = recordPathCache.getCompiled(lookupPathText); for (final PropertyDescriptor prop : context.getProperties().keySet()) {
if (!prop.isDynamic()) {
continue;
}
final String pathText = context.getProperty(prop).evaluateAttributeExpressions(flowFile).getValue();
final RecordPath lookupRecordPath = recordPathCache.getCompiled(pathText);
recordPaths.put(prop.getName(), lookupRecordPath);
}
final RecordPath resultRecordPath; final RecordPath resultRecordPath;
if (context.getProperty(RESULT_RECORD_PATH).isSet()) { if (context.getProperty(RESULT_RECORD_PATH).isSet()) {
@ -255,6 +321,7 @@ public class LookupRecord extends AbstractRouteRecord<Tuple<RecordPath, RecordPa
resultRecordPath = null; resultRecordPath = null;
} }
return new Tuple<>(lookupRecordPath, resultRecordPath); return new Tuple<>(recordPaths, resultRecordPath);
} }
} }

View File

@ -17,10 +17,13 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.lookup.StringLookupService; import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
@ -57,7 +60,7 @@ public class TestLookupRecord {
runner.setProperty(LookupRecord.RECORD_READER, "reader"); runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer"); runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup"); runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/name"); runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport"); runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED); runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
@ -145,7 +148,7 @@ public class TestLookupRecord {
@Test @Test
public void testLookupPathNotFound() throws InitializationException { public void testLookupPathNotFound() throws InitializationException {
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/other"); runner.setProperty("lookup", "/other");
runner.enqueue(""); runner.enqueue("");
runner.run(); runner.run();
@ -197,7 +200,7 @@ public class TestLookupRecord {
lookupService.addValue("Jane Doe", "Basketball"); lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football"); lookupService.addValue("Jimmy Doe", "Football");
runner.setProperty(LookupRecord.LOOKUP_RECORD_PATH, "/*"); runner.setProperty("lookup", "/*");
runner.enqueue(""); runner.enqueue("");
runner.run(); runner.run();
@ -210,6 +213,19 @@ public class TestLookupRecord {
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n"); out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
} }
@Test
public void testInvalidUnlessAllRequiredPropertiesAdded() throws InitializationException {
runner.removeProperty(new PropertyDescriptor.Builder().name("lookup").build());
runner.setProperty("hello", "/name");
runner.assertNotValid();
runner.setProperty("lookup", "xx");
runner.assertNotValid();
runner.setProperty("lookup", "/name");
runner.assertValid();
}
private static class MapLookup extends AbstractControllerService implements StringLookupService { private static class MapLookup extends AbstractControllerService implements StringLookupService {
@ -225,9 +241,23 @@ public class TestLookupRecord {
} }
@Override @Override
public Optional<String> lookup(final String key) { public Optional<String> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
return Optional.empty();
}
final String key = coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(values.get(key)); return Optional.ofNullable(values.get(key));
} }
@Override
public Set<String> getRequiredKeys() {
return Collections.singleton("lookup");
}
} }
} }

View File

@ -17,25 +17,36 @@
package org.apache.nifi.lookup; package org.apache.nifi.lookup;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import org.apache.nifi.controller.ControllerService; import org.apache.nifi.controller.ControllerService;
public interface LookupService<T> extends ControllerService { public interface LookupService<T> extends ControllerService {
/** /**
* Looks up a value that corresponds to the given key * Looks up a value that corresponds to the given map of information, referred to as lookup coordinates
* *
* @param key the key to lookup * @param coordinates a Map of key/value pairs that indicate the information that should be looked up
* @return a value that corresponds to the given key * @return a value that corresponds to the given coordinates
* *
* @throws LookupFailureException if unable to lookup a value for the given key * @throws LookupFailureException if unable to lookup a value for the given coordinates
*/ */
Optional<T> lookup(String key) throws LookupFailureException; Optional<T> lookup(Map<String, String> coordinates) throws LookupFailureException;
/** /**
* @return the Class that represents the type of value that will be returned by {@link #lookup(String)} * @return the Class that represents the type of value that will be returned by {@link #lookup(Map)}
*/ */
Class<?> getValueType(); Class<?> getValueType();
/**
* Many Lookup Services will require a specific set of information be passed in to the {@link #lookup(Map)} method.
* This method will return the Set of keys that must be present in the map that is passed to {@link #lookup(Map)} in order
* for the lookup to succeed.
*
* @return the keys that must be present in the map passed to {@link #lookup(Map)} in order to the lookup to succeed, or an empty set
* if no specific keys are required.
*/
Set<String> getRequiredKeys();
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.lookup; package org.apache.nifi.lookup;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
@ -24,15 +25,15 @@ import org.apache.nifi.serialization.record.Record;
public interface RecordLookupService extends LookupService<Record> { public interface RecordLookupService extends LookupService<Record> {
/** /**
* Returns an Optional Record that corresponds to the given key * Returns an Optional Record that corresponds to the given coordinates
* *
* @param key the key to lookup * @param coordinates the coordinates to lookup
* @return an Optional Record that corresponds to the given key * @return an Optional Record that corresponds to the given coordinates
* *
* @throws LookupFailureException if unable to lookup a value for the given key * @throws LookupFailureException if unable to lookup a value for the given coordinates
*/ */
@Override @Override
Optional<Record> lookup(String key) throws LookupFailureException; Optional<Record> lookup(Map<String, String> coordinates) throws LookupFailureException;
@Override @Override
default Class<?> getValueType() { default Class<?> getValueType() {

View File

@ -17,18 +17,19 @@
package org.apache.nifi.lookup; package org.apache.nifi.lookup;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
public interface StringLookupService extends LookupService<String> { public interface StringLookupService extends LookupService<String> {
/** /**
* Returns an Optional value that corresponds to the given key * Returns an Optional value that corresponds to the given coordinates
* *
* @param key the key to lookup * @param coordinates the coordinates to lookup
* @return an Optional String that represents the value for the given key * @return an Optional String that represents the value for the given coordinates
*/ */
@Override @Override
Optional<String> lookup(String key); Optional<String> lookup(Map<String, String> coordinates);
@Override @Override
default Class<?> getValueType() { default Class<?> getValueType() {

View File

@ -20,7 +20,9 @@ package org.apache.nifi.lookup;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -31,8 +33,11 @@ import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
@Tags({"lookup", "enrich", "key", "value"}) @Tags({"lookup", "enrich", "key", "value"})
@CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name.") @CapabilityDescription("Allows users to add key/value pairs as User-defined Properties. Each property that is added can be looked up by Property Name. "
+ "The coordinates that are passed to the lookup must contain the key 'key'.")
public class SimpleKeyValueLookupService extends AbstractControllerService implements StringLookupService { public class SimpleKeyValueLookupService extends AbstractControllerService implements StringLookupService {
private static final String KEY = "key";
private static final Set<String> REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet());
private volatile Map<String, String> lookupValues = new HashMap<>(); private volatile Map<String, String> lookupValues = new HashMap<>();
@Override @Override
@ -52,7 +57,21 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple
} }
@Override @Override
public Optional<String> lookup(final String key) { public Optional<String> lookup(final Map<String, String> coordinates) {
if (coordinates == null) {
return Optional.empty();
}
final String key = coordinates.get(KEY);
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(lookupValues.get(key)); return Optional.ofNullable(lookupValues.get(key));
} }
@Override
public Set<String> getRequiredKeys() {
return REQUIRED_KEYS;
}
} }

View File

@ -17,16 +17,23 @@
package org.apache.nifi.lookup.maxmind; package org.apache.nifi.lookup.maxmind;
import com.maxmind.db.InvalidDatabaseException; import java.io.File;
import com.maxmind.geoip2.model.AnonymousIpResponse; import java.io.FileInputStream;
import com.maxmind.geoip2.model.CityResponse; import java.io.IOException;
import com.maxmind.geoip2.model.ConnectionTypeResponse; import java.io.InputStream;
import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType; import java.net.InetAddress;
import com.maxmind.geoip2.model.DomainResponse; import java.util.ArrayList;
import com.maxmind.geoip2.model.IspResponse; import java.util.HashMap;
import com.maxmind.geoip2.record.Country; import java.util.List;
import com.maxmind.geoip2.record.Location; import java.util.Map;
import com.maxmind.geoip2.record.Subdivision; import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.DigestUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
@ -42,28 +49,30 @@ import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StopWatch;
import java.io.File; import com.maxmind.db.InvalidDatabaseException;
import java.io.FileInputStream; import com.maxmind.geoip2.model.AnonymousIpResponse;
import java.io.IOException; import com.maxmind.geoip2.model.CityResponse;
import java.io.InputStream; import com.maxmind.geoip2.model.ConnectionTypeResponse;
import java.net.InetAddress; import com.maxmind.geoip2.model.ConnectionTypeResponse.ConnectionType;
import java.util.ArrayList; import com.maxmind.geoip2.model.DomainResponse;
import java.util.HashMap; import com.maxmind.geoip2.model.IspResponse;
import java.util.List; import com.maxmind.geoip2.record.Country;
import java.util.Map; import com.maxmind.geoip2.record.Location;
import java.util.Optional; import com.maxmind.geoip2.record.Subdivision;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"}) @Tags({"lookup", "enrich", "ip", "geo", "ipgeo", "maxmind", "isp", "domain", "cellular", "anonymous", "tor"})
@CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind " @CapabilityDescription("A lookup service that provides several types of enrichment information for IP addresses. The service is configured by providing a MaxMind "
+ "Database file and specifying which types of enrichment should be provided for an IP Address. Each type of enrichment is a separate lookup, so configuring the " + "Database file and specifying which types of enrichment should be provided for an IP Address or Hostname. Each type of enrichment is a separate lookup, so configuring the "
+ "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. View the Usage of this component " + "service to provide all of the available enrichment data may be slower than returning only a portion of the available enrichments. In order to use this service, a lookup "
+ "must be performed using key of 'ip' and a value that is a valid IP address or hostname. View the Usage of this component "
+ "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.") + "and choose to view Additional Details for more information, such as the Schema that pertains to the information that is returned.")
public class IPLookupService extends AbstractControllerService implements RecordLookupService { public class IPLookupService extends AbstractControllerService implements RecordLookupService {
private volatile String databaseFile = null; private volatile String databaseFile = null;
private static final String IP_KEY = "ip";
private static final Set<String> REQUIRED_KEYS = Stream.of(IP_KEY).collect(Collectors.toSet());
private volatile DatabaseReader databaseReader = null; private volatile DatabaseReader databaseReader = null;
private volatile String databaseChecksum = null; private volatile String databaseChecksum = null;
private volatile long databaseLastRefreshAttempt = -1; private volatile long databaseLastRefreshAttempt = -1;
@ -175,8 +184,13 @@ public class IPLookupService extends AbstractControllerService implements Record
} }
@Override @Override
public Optional<Record> lookup(final String key) throws LookupFailureException { public Set<String> getRequiredKeys() {
if (key == null) { return REQUIRED_KEYS;
}
@Override
public Optional<Record> lookup(final Map<String, String> coordinates) throws LookupFailureException {
if (coordinates == null) {
return Optional.empty(); return Optional.empty();
} }
@ -193,7 +207,7 @@ public class IPLookupService extends AbstractControllerService implements Record
// InvalidDatabaseException, so force a reload and then retry the lookup one time, if we still get an error then throw it // InvalidDatabaseException, so force a reload and then retry the lookup one time, if we still get an error then throw it
try { try {
final DatabaseReader databaseReader = this.databaseReader; final DatabaseReader databaseReader = this.databaseReader;
return doLookup(databaseReader, key); return doLookup(databaseReader, coordinates);
} catch (InvalidDatabaseException idbe) { } catch (InvalidDatabaseException idbe) {
if (dbWriteLock.tryLock()) { if (dbWriteLock.tryLock()) {
try { try {
@ -210,7 +224,7 @@ public class IPLookupService extends AbstractControllerService implements Record
getLogger().debug("Attempting to retry lookup after InvalidDatabaseException"); getLogger().debug("Attempting to retry lookup after InvalidDatabaseException");
try { try {
final DatabaseReader databaseReader = this.databaseReader; final DatabaseReader databaseReader = this.databaseReader;
return doLookup(databaseReader, key); return doLookup(databaseReader, coordinates);
} catch (final Exception e) { } catch (final Exception e) {
throw new LookupFailureException("Error performing look up: " + e.getMessage(), e); throw new LookupFailureException("Error performing look up: " + e.getMessage(), e);
} }
@ -218,18 +232,23 @@ public class IPLookupService extends AbstractControllerService implements Record
dbWriteLock.unlock(); dbWriteLock.unlock();
} }
} else { } else {
throw new LookupFailureException("Failed to lookup the key " + key + " due to " + idbe.getMessage(), idbe); throw new LookupFailureException("Failed to lookup a value for " + coordinates + " due to " + idbe.getMessage(), idbe);
} }
} }
} }
private Optional<Record> doLookup(final DatabaseReader databaseReader, final String key) throws LookupFailureException, InvalidDatabaseException { private Optional<Record> doLookup(final DatabaseReader databaseReader, final Map<String, String> coordinates) throws LookupFailureException, InvalidDatabaseException {
final String ipAddress = coordinates.get(IP_KEY);
if (ipAddress == null) {
return Optional.empty();
}
final InetAddress inetAddress; final InetAddress inetAddress;
try { try {
inetAddress = InetAddress.getByName(key); inetAddress = InetAddress.getByName(ipAddress);
} catch (final IOException ioe) { } catch (final IOException ioe) {
getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " + getLogger().warn("Could not resolve the IP for value '{}'. This is usually caused by issue resolving the appropriate DNS record or " +
"providing the service with an invalid IP address", new Object[] {key}, ioe); "providing the service with an invalid IP address", new Object[] {coordinates}, ioe);
return Optional.empty(); return Optional.empty();
} }