diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index e7bfbabdf0..69129a50f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -1,13 +1,13 @@
-
4.0.0
@@ -255,6 +255,11 @@
nifi-schema-registry-service-api
test
+
+ org.apache.nifi
+ nifi-lookup-services
+ test
+
org.apache.derby
derby
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
new file mode 100644
index 0000000000..1e0a674306
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LookupAttribute.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.lookup.LookupFailureException;
+import org.apache.nifi.lookup.LookupService;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"lookup", "cache", "enrich", "join", "attributes", "Attribute Expression Language"})
+@CapabilityDescription("Lookup attributes from a lookup service")
+@DynamicProperty(name = "The name of the attribute to add to the FlowFile",
+ value = "The name of the key or property to retrieve from the lookup service",
+ supportsExpressionLanguage = true,
+ description = "Adds a FlowFile attribute specified by the dynamic property's key with the value found in the lookup service using the the dynamic property's value")
+public class LookupAttribute extends AbstractProcessor {
+
+ public static final PropertyDescriptor LOOKUP_SERVICE =
+ new PropertyDescriptor.Builder()
+ .name("lookup-service")
+ .displayName("Lookup Service")
+ .description("The lookup service to use for attribute lookups")
+ .identifiesControllerService(StringLookupService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_EMPTY_VALUES =
+ new PropertyDescriptor.Builder()
+ .name("include-empty-values")
+ .displayName("Include Empty Values")
+ .description("Include null or blank values for keys that are null or blank")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ public static final Relationship REL_MATCHED = new Relationship.Builder()
+ .description("FlowFiles with matching lookups are routed to this relationship")
+ .name("matched")
+ .build();
+
+ public static final Relationship REL_UNMATCHED = new Relationship.Builder()
+ .description("FlowFiles with missing lookups are routed to this relationship")
+ .name("unmatched")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .description("FlowFiles with failing lookups are routed to this relationship")
+ .name("failure")
+ .build();
+
+ private List descriptors;
+
+ private Set relationships;
+
+ private Map dynamicProperties;
+
+ @Override
+ protected Collection customValidate(ValidationContext validationContext) {
+ final List errors = new ArrayList<>(super.customValidate(validationContext));
+
+ final Set dynamicProperties = validationContext.getProperties().keySet().stream()
+ .filter(prop -> prop.isDynamic())
+ .collect(Collectors.toSet());
+
+ if (dynamicProperties == null || dynamicProperties.size() < 1) {
+ errors.add(new ValidationResult.Builder()
+ .subject("User-Defined Properties")
+ .valid(false)
+ .explanation("At least one user-defined property must be specified.")
+ .build());
+ }
+
+ final Set requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
+ if (requiredKeys == null || requiredKeys.size() != 1) {
+ errors.add(new ValidationResult.Builder()
+ .subject(LOOKUP_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation("LookupAttribute requires a key-value lookup service supporting exactly one required key.")
+ .build());
+ }
+
+ return errors;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public Set getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List descriptors = new ArrayList();
+ descriptors.add(LOOKUP_SERVICE);
+ descriptors.add(INCLUDE_EMPTY_VALUES);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set relationships = new HashSet();
+ relationships.add(REL_MATCHED);
+ relationships.add(REL_UNMATCHED);
+ relationships.add(REL_FAILURE);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ // Load up all the dynamic properties once for use later in onTrigger
+ final Map dynamicProperties = new HashMap<>();
+ for (final Map.Entry e : context.getProperties().entrySet()) {
+ final PropertyDescriptor descriptor = e.getKey();
+ if (descriptor.isDynamic()) {
+ final PropertyValue value = context.getProperty(descriptor);
+ dynamicProperties.put(descriptor, value);
+ }
+ }
+ this.dynamicProperties = Collections.unmodifiableMap(dynamicProperties);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ final ComponentLog logger = getLogger();
+ final LookupService lookupService = context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class);
+ final boolean includeEmptyValues = context.getProperty(INCLUDE_EMPTY_VALUES).asBoolean();
+ for (FlowFile flowFile : session.get(50)) {
+ try {
+ onTrigger(logger, lookupService, includeEmptyValues, flowFile, session);
+ } catch (final IOException e) {
+ throw new ProcessException(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void onTrigger(ComponentLog logger, LookupService lookupService,
+ boolean includeEmptyValues, FlowFile flowFile, ProcessSession session)
+ throws ProcessException, IOException {
+
+ final Map attributes = new HashMap<>(flowFile.getAttributes());
+
+ boolean matched = false;
+ try {
+ final Set requiredKeys = lookupService.getRequiredKeys();
+ if (requiredKeys == null || requiredKeys.size() != 1) {
+ throw new ProcessException("LookupAttribute requires a key-value lookup service supporting exactly one required key, was: " +
+ (requiredKeys == null ? "null" : String.valueOf(requiredKeys.size())));
+ }
+
+ final String coordinateKey = requiredKeys.iterator().next();
+ for (final Map.Entry e : dynamicProperties.entrySet()) {
+ final PropertyValue lookupKeyExpression = e.getValue();
+ final String lookupKey = lookupKeyExpression.evaluateAttributeExpressions(flowFile).getValue();
+ final String attributeName = e.getKey().getName();
+ final Optional attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey));
+ matched = putAttribute(attributeName, attributeValue, attributes, includeEmptyValues, logger) || matched;
+
+ if (!matched && logger.isDebugEnabled()) {
+ logger.debug("No such value for key: {}", new Object[]{lookupKey});
+ }
+ }
+ } catch (final LookupFailureException e) {
+ logger.error(e.getMessage(), e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, matched ? REL_MATCHED : REL_UNMATCHED);
+ }
+
+ private boolean putAttribute(final String attributeName, final Optional attributeValue, final Map attributes, final boolean includeEmptyValues, final ComponentLog logger) {
+ boolean matched = false;
+ if (attributeValue.isPresent() && StringUtils.isNotBlank(attributeValue.get())) {
+ attributes.put(attributeName, attributeValue.get());
+ matched = true;
+ } else if (includeEmptyValues) {
+ attributes.put(attributeName, attributeValue.isPresent() ? "" : "null");
+ matched = true;
+ }
+ return matched;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 1034384e44..09681da3f2 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -59,6 +59,7 @@ org.apache.nifi.processors.standard.ListenUDP
org.apache.nifi.processors.standard.ListSFTP
org.apache.nifi.processors.standard.LogAttribute
org.apache.nifi.processors.standard.LogMessage
+org.apache.nifi.processors.standard.LookupAttribute
org.apache.nifi.processors.standard.LookupRecord
org.apache.nifi.processors.standard.MergeContent
org.apache.nifi.processors.standard.ModifyBytes
@@ -105,4 +106,4 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache
org.apache.nifi.processors.standard.ListFTP
org.apache.nifi.processors.standard.FetchFTP
org.apache.nifi.processors.standard.UpdateCounter
-org.apache.nifi.processors.standard.UpdateRecord
\ No newline at end of file
+org.apache.nifi.processors.standard.UpdateRecord
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
new file mode 100644
index 0000000000..ce568abce3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLookupAttribute.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.lookup.SimpleKeyValueLookupService;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+public class TestLookupAttribute {
+
+ @Test
+ public void testKeyValueLookupAttribute() throws InitializationException {
+ final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
+
+ final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
+ runner.addControllerService("simple-key-value-lookup-service", service);
+ runner.setProperty(service, "key1", "value1");
+ runner.setProperty(service, "key2", "value2");
+ runner.setProperty(service, "key3", "value3");
+ runner.setProperty(service, "key4", " ");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
+ runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "true");
+ runner.setProperty("foo", "key1");
+ runner.setProperty("bar", "key2");
+ runner.setProperty("baz", "${attr1}");
+ runner.setProperty("qux", "key4");
+ runner.setProperty("zab", "key5");
+ runner.assertValid();
+
+ final Map attributes = new HashMap<>();
+ attributes.put("attr1", "key3");
+
+ runner.enqueue("some content".getBytes(), attributes);
+ runner.run(1, false);
+ runner.assertAllFlowFilesTransferred(LookupAttribute.REL_MATCHED, 1);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(LookupAttribute.REL_MATCHED).get(0);
+
+ assertNotNull(flowFile);
+
+ flowFile.assertAttributeExists("foo");
+ flowFile.assertAttributeExists("bar");
+ flowFile.assertAttributeExists("baz");
+ flowFile.assertAttributeExists("qux");
+ flowFile.assertAttributeExists("zab");
+ flowFile.assertAttributeNotExists("zar");
+
+ flowFile.assertAttributeEquals("foo", "value1");
+ flowFile.assertAttributeEquals("bar", "value2");
+ flowFile.assertAttributeEquals("baz", "value3");
+ flowFile.assertAttributeEquals("qux", "");
+ flowFile.assertAttributeEquals("zab", "null");
+ }
+
+ @Test
+ public void testLookupAttributeUnmatched() throws InitializationException {
+ final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
+
+ final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
+ runner.addControllerService("simple-key-value-lookup-service", service);
+ runner.setProperty(service, "key1", "value1");
+ runner.setProperty(service, "key2", "value2");
+ runner.setProperty(service, "key3", "value3");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
+ runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "false");
+ runner.setProperty("baz", "${attr1}");
+ runner.assertValid();
+
+ final Map attributes = new HashMap<>();
+ attributes.put("attr1", "key4");
+
+ runner.enqueue("some content".getBytes(), attributes);
+ runner.run(1, false);
+ runner.assertAllFlowFilesTransferred(LookupAttribute.REL_UNMATCHED, 1);
+
+ final MockFlowFile flowFile = runner.getFlowFilesForRelationship(LookupAttribute.REL_UNMATCHED).get(0);
+
+ assertNotNull(flowFile);
+
+ flowFile.assertAttributeExists("attr1");
+ flowFile.assertAttributeNotExists("baz");
+ flowFile.assertAttributeEquals("attr1", "key4");
+ }
+
+ @Test
+ public void testCustomValidateInvalidLookupService() throws InitializationException {
+ final InvalidLookupService service = new InvalidLookupService();
+
+ final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
+ runner.addControllerService("invalid-lookup-service", service);
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "invalid-lookup-service");
+ runner.setProperty("foo", "key1");
+ runner.assertNotValid();
+ }
+
+ @Test
+ public void testCustomValidateMissingDynamicProps() throws InitializationException {
+ final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
+
+ final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
+ runner.addControllerService("simple-key-value-lookup-service", service);
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+ runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
+ runner.assertNotValid();
+ }
+
+ private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
+ @Override
+ public Optional lookup(Map coordinates) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Set getRequiredKeys() {
+ final Set requiredKeys = new HashSet<>();
+ requiredKeys.add("key1");
+ requiredKeys.add("key2");
+ return Collections.unmodifiableSet(requiredKeys);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
index aa2721bdb2..216dd5a41d 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-service-api/src/main/java/org/apache/nifi/lookup/StringLookupService.java
@@ -27,12 +27,15 @@ public interface StringLookupService extends LookupService {
*
* @param coordinates the coordinates to lookup
* @return an Optional String that represents the value for the given coordinates
+ *
+ * @throws LookupFailureException if unable to lookup a value for the given key
*/
@Override
- Optional lookup(Map coordinates);
+ Optional lookup(Map coordinates) throws LookupFailureException;
@Override
default Class> getValueType() {
return String.class;
}
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE
index 0fe04666e3..5fd0f6645c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services-nar/src/main/resources/META-INF/NOTICE
@@ -10,6 +10,30 @@ Apache Software License v2
The following binary components are provided under the Apache Software License v2
+ (ASLv2) Apache Commons Configuration
+ The following NOTICE information applies:
+ Apache Commons Configuration
+ Copyright 2001-2017 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Commons CSV
+ The following NOTICE information applies:
+ Apache Commons CSV
+ Copyright 2005-2016 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ (ASLv2) Apache Commons BeanUtils
+ The following NOTICE information applies:
+ Apache Commons BeanUtils
+ Copyright 2000-2016 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
(ASLv2) Apache Commons Lang
The following NOTICE information applies:
Apache Commons Lang
@@ -22,7 +46,7 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
Apache HttpClient
Copyright 1999-2014 The Apache Software Foundation
-
+
Apache HttpCore
Copyright 2005-2014 The Apache Software Foundation
@@ -60,7 +84,7 @@ The following binary components are provided under the Apache Software License v
The following NOTICE information applies:
GeoIP2 Java API
This software is Copyright (c) 2013 by MaxMind, Inc.
-
+
************************
Creative Commons Attribution-ShareAlike 3.0
************************
@@ -68,6 +92,3 @@ Creative Commons Attribution-ShareAlike 3.0
The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)
-
-
-
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
index bd711a108d..a76731aaf5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml
@@ -1,14 +1,14 @@
-
4.0.0
@@ -24,6 +24,10 @@
org.apache.nifi
nifi-api
+
+ org.apache.nifi
+ nifi-processor-utils
+
org.apache.nifi
nifi-lookup-service-api
@@ -36,6 +40,21 @@
org.apache.nifi
nifi-record
+
+ org.apache.commons
+ commons-configuration2
+ 2.1.1
+
+
+ org.apache.commons
+ commons-csv
+ 1.4
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.3
+
com.maxmind.geoip2
geoip2
@@ -47,5 +66,36 @@
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ test
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ src/test/resources/test.csv
+ src/test/resources/test.properties
+ src/test/resources/test.xml
+
+
+
+
+
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/PropertiesFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/PropertiesFileLookupService.java
new file mode 100644
index 0000000000..7d78cfd527
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/PropertiesFileLookupService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.configuration2.PropertiesConfiguration;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.lookup.configuration2.CommonsConfigurationLookupService;
+
+@Tags({"lookup", "cache", "enrich", "join", "properties", "reloadable", "key", "value"})
+@CapabilityDescription("A reloadable properties file-based lookup service")
+public class PropertiesFileLookupService extends CommonsConfigurationLookupService {
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
new file mode 100644
index 0000000000..d5aa164fba
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleCsvFileLookupService.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVRecord;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
+import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
+
+@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
+@CapabilityDescription("A reloadable properties file-based lookup service")
+public class SimpleCsvFileLookupService extends AbstractControllerService implements StringLookupService {
+
+ private static final String KEY = "key";
+
+ private static final Set REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+ public static final PropertyDescriptor CSV_FILE =
+ new PropertyDescriptor.Builder()
+ .name("csv-file")
+ .displayName("CSV File")
+ .description("A CSV file.")
+ .required(true)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
+ .name("CSV Format")
+ .description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.")
+ .expressionLanguageSupported(false)
+ .allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
+ .defaultValue(CSVFormat.Predefined.Default.toString())
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
+ new PropertyDescriptor.Builder()
+ .name("lookup-key-column")
+ .displayName("Lookup Key Column")
+ .description("Lookup key column.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
+ new PropertyDescriptor.Builder()
+ .name("lookup-value-column")
+ .displayName("Lookup Value Column")
+ .description("Lookup value column.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor IGNORE_DUPLICATES =
+ new PropertyDescriptor.Builder()
+ .name("ignore-duplicates")
+ .displayName("Ignore Duplicates")
+ .description("Ignore duplicate keys for records in the CSV file.")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+
+ private List properties;
+
+ private volatile ConcurrentMap cache;
+
+ private volatile String csvFile;
+
+ private volatile CSVFormat csvFormat;
+
+ private volatile String lookupKeyColumn;
+
+ private volatile String lookupValueColumn;
+
+ private volatile boolean ignoreDuplicates;
+
+ private volatile SynchronousFileWatcher watcher;
+
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private void loadCache() throws IllegalStateException, IOException {
+ if (lock.tryLock()) {
+ try {
+ final ComponentLog logger = getLogger();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Loading lookup table from file: " + csvFile);
+ }
+
+ final Map properties = new HashMap<>();
+ final FileReader reader = new FileReader(csvFile);
+ final Iterable records = csvFormat.withFirstRecordAsHeader().parse(reader);
+ for (final CSVRecord record : records) {
+ final String key = record.get(lookupKeyColumn);
+ final String value = record.get(lookupValueColumn);
+ if (StringUtils.isBlank(key)) {
+ throw new IllegalStateException("Empty lookup key encountered in: " + csvFile);
+ } else if (!ignoreDuplicates && properties.containsKey(key)) {
+ throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + csvFile);
+ } else if (ignoreDuplicates && properties.containsKey(key)) {
+ logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, csvFile});
+ }
+ properties.put(key, value);
+ }
+
+ this.cache = new ConcurrentHashMap<>(properties);
+
+ if (cache.isEmpty()) {
+ logger.warn("Lookup table is empty after reading file: " + csvFile);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
+ final List properties = new ArrayList<>();
+ properties.add(CSV_FILE);
+ properties.add(CSV_FORMAT);
+ properties.add(LOOKUP_KEY_COLUMN);
+ properties.add(LOOKUP_VALUE_COLUMN);
+ properties.add(IGNORE_DUPLICATES);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, FileNotFoundException {
+ this.csvFile = context.getProperty(CSV_FILE).getValue();
+ this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
+ this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).getValue();
+ this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).getValue();
+ this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
+ this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
+ try {
+ loadCache();
+ } catch (final IllegalStateException e) {
+ throw new InitializationException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public Optional lookup(final Map coordinates) throws LookupFailureException {
+ if (coordinates == null) {
+ return Optional.empty();
+ }
+
+ final String key = coordinates.get(KEY);
+ if (StringUtils.isBlank(key)) {
+ return Optional.empty();
+ }
+
+ try {
+ if (watcher != null && watcher.checkAndReset()) {
+ loadCache();
+ }
+ } catch (final IllegalStateException | IOException e) {
+ throw new LookupFailureException(e.getMessage(), e);
+ }
+
+ return Optional.ofNullable(cache.get(key));
+ }
+
+ @Override
+ public Set getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
index 4ed75b2150..7176260b7e 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/SimpleKeyValueLookupService.java
@@ -47,6 +47,7 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple
.required(false)
.dynamic(true)
.addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
.build();
}
@@ -74,4 +75,5 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple
public Set getRequiredKeys() {
return REQUIRED_KEYS;
}
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/XMLFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/XMLFileLookupService.java
new file mode 100644
index 0000000000..e522e31353
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/XMLFileLookupService.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import org.apache.commons.configuration2.XMLConfiguration;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.lookup.configuration2.CommonsConfigurationLookupService;
+
+@Tags({"lookup", "cache", "enrich", "join", "xml", "reloadable", "key", "value"})
+@CapabilityDescription("A reloadable properties file-based lookup service")
+public class XMLFileLookupService extends CommonsConfigurationLookupService {
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/configuration2/CommonsConfigurationLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/configuration2/CommonsConfigurationLookupService.java
new file mode 100644
index 0000000000..a8be80f2dd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/configuration2/CommonsConfigurationLookupService.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup.configuration2;
+
+import java.io.File;
+import java.lang.reflect.ParameterizedType;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.FileBasedConfiguration;
+import org.apache.commons.configuration2.builder.ConfigurationBuilderEvent;
+import org.apache.commons.configuration2.builder.ReloadingFileBasedConfigurationBuilder;
+import org.apache.commons.configuration2.builder.fluent.FileBasedBuilderParameters;
+import org.apache.commons.configuration2.builder.fluent.Parameters;
+import org.apache.commons.configuration2.event.EventListener;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.lookup.StringLookupService;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+/**
+ * This abstract class defines a generic {@link LookupService} backed by an
+ * Apache Commons Configuration {@link FileBasedConfiguration}.
+ *
+ */
+public abstract class CommonsConfigurationLookupService extends AbstractControllerService implements StringLookupService {
+
+ private static final String KEY = "key";
+
+ private static final Set REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
+
+ public static final PropertyDescriptor CONFIGURATION_FILE =
+ new PropertyDescriptor.Builder()
+ .name("configuration-file")
+ .displayName("Configuration File")
+ .description("A configuration file")
+ .required(true)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ private final Class resultClass = (Class) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+
+ private List properties;
+
+ private volatile ReloadingFileBasedConfigurationBuilder builder;
+
+ private Configuration getConfiguration() {
+ try {
+ if (builder != null) {
+ return builder.getConfiguration();
+ }
+ } catch (final ConfigurationException e) {
+ // TODO: Need to fail starting the service if this happens
+ getLogger().error(e.getMessage(), e);
+ }
+ return null;
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
+ final List properties = new ArrayList<>();
+ properties.add(CONFIGURATION_FILE);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException {
+ final String config = context.getProperty(CONFIGURATION_FILE).getValue();
+ final FileBasedBuilderParameters params = new Parameters().fileBased().setFile(new File(config));
+ this.builder = new ReloadingFileBasedConfigurationBuilder<>(resultClass).configure(params);
+ builder.addEventListener(ConfigurationBuilderEvent.CONFIGURATION_REQUEST,
+ new EventListener() {
+ @Override
+ public void onEvent(ConfigurationBuilderEvent event) {
+ if (builder.getReloadingController().checkForReloading(null)) {
+ getLogger().debug("Reloading " + config);
+ }
+ }
+ });
+ }
+
+ @Override
+ public Optional lookup(final Map coordinates) {
+ if (coordinates == null) {
+ return Optional.empty();
+ }
+
+ final String key = coordinates.get(KEY);
+ if (StringUtils.isBlank(key)) {
+ return Optional.empty();
+ }
+
+ final Configuration config = getConfiguration();
+ if (config != null) {
+ final Object value = config.getProperty(key);
+ if (value != null) {
+ return Optional.of(String.valueOf(value));
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ @Override
+ public Set getRequiredKeys() {
+ return REQUIRED_KEYS;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index cdff77ca90..ceaefa8bbc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
org.apache.nifi.lookup.maxmind.IPLookupService
-org.apache.nifi.lookup.SimpleKeyValueLookupService
\ No newline at end of file
+org.apache.nifi.lookup.PropertiesFileLookupService
+org.apache.nifi.lookup.SimpleKeyValueLookupService
+org.apache.nifi.lookup.SimpleCsvFileLookupService
+org.apache.nifi.lookup.XMLFileLookupService
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestProcessor.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestProcessor.java
new file mode 100644
index 0000000000..da1d46665b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestProcessor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+
+public class TestProcessor extends AbstractProcessor {
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+ }
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ List properties = new ArrayList<>();
+ properties.add(new PropertyDescriptor.Builder()
+ .name("LookupService test processor")
+ .description("LookupService test processor")
+ .identifiesControllerService(LookupService.class)
+ .required(true)
+ .build());
+ return properties;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestPropertiesFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestPropertiesFileLookupService.java
new file mode 100644
index 0000000000..7cfd1dd150
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestPropertiesFileLookupService.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestPropertiesFileLookupService {
+
+ final static Optional EMPTY_STRING = Optional.empty();
+
+ @Test
+ public void testPropertiesFileLookupService() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+ final PropertiesFileLookupService service = new PropertiesFileLookupService();
+
+ runner.addControllerService("properties-file-lookup-service", service);
+ runner.setProperty(service, PropertiesFileLookupService.CONFIGURATION_FILE, "src/test/resources/test.properties");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ final PropertiesFileLookupService lookupService =
+ (PropertiesFileLookupService) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("properties-file-lookup-service");
+
+ assertThat(lookupService, instanceOf(LookupService.class));
+
+ final Optional property1 = lookupService.lookup(Collections.singletonMap("key", "property.1"));
+ assertEquals(Optional.of("this is property 1"), property1);
+
+ final Optional property2 = lookupService.lookup(Collections.singletonMap("key", "property.2"));
+ assertEquals(Optional.of("this is property 2"), property2);
+
+ final Optional property3 = lookupService.lookup(Collections.singletonMap("key", "property.3"));
+ assertEquals(EMPTY_STRING, property3);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
new file mode 100644
index 0000000000..6e2b16ed0e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleCsvFileLookupService.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestSimpleCsvFileLookupService {
+
+ final static Optional EMPTY_STRING = Optional.empty();
+
+ @Test
+ public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+ final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
+
+ runner.addControllerService("csv-file-lookup-service", service);
+ runner.setProperty(service, SimpleCsvFileLookupService.CSV_FILE, "src/test/resources/test.csv");
+ runner.setProperty(service, SimpleCsvFileLookupService.CSV_FORMAT, "RFC4180");
+ runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_KEY_COLUMN, "key");
+ runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_VALUE_COLUMN, "value");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ final SimpleCsvFileLookupService lookupService =
+ (SimpleCsvFileLookupService) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("csv-file-lookup-service");
+
+ assertThat(lookupService, instanceOf(LookupService.class));
+
+ final Optional property1 = lookupService.lookup(Collections.singletonMap("key", "property.1"));
+ assertEquals(Optional.of("this is property 1"), property1);
+
+ final Optional property2 = lookupService.lookup(Collections.singletonMap("key", "property.2"));
+ assertEquals(Optional.of("this is property 2"), property2);
+
+ final Optional property3 = lookupService.lookup(Collections.singletonMap("key", "property.3"));
+ assertEquals(EMPTY_STRING, property3);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleKeyValueLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleKeyValueLookupService.java
new file mode 100644
index 0000000000..e46423a5b9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestSimpleKeyValueLookupService.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestSimpleKeyValueLookupService {
+
+ final static Optional EMPTY_STRING = Optional.empty();
+
+ @Test
+ public void testSimpleKeyValueLookupService() throws InitializationException {
+ final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
+
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+ runner.addControllerService("simple-key-value-lookup-service", service);
+ runner.setProperty(service, "key1", "value1");
+ runner.setProperty(service, "key2", "value2");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ assertThat(service, instanceOf(LookupService.class));
+
+ final Optional get1 = service.lookup(Collections.singletonMap("key", "key1"));
+ assertEquals(Optional.of("value1"), get1);
+
+ final Optional get2 = service.lookup(Collections.singletonMap("key", "key2"));
+ assertEquals(Optional.of("value2"), get2);
+
+ final Optional get3 = service.lookup(Collections.singletonMap("key", "key3"));
+ assertEquals(EMPTY_STRING, get3);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestXMLFileLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestXMLFileLookupService.java
new file mode 100644
index 0000000000..606385813f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestXMLFileLookupService.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.lookup;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+public class TestXMLFileLookupService {
+
+ final static Optional EMPTY_STRING = Optional.empty();
+
+ @Test
+ public void testXMLFileLookupService() throws InitializationException {
+ final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
+ final XMLFileLookupService service = new XMLFileLookupService();
+
+ runner.addControllerService("xml-file-lookup-service", service);
+ runner.setProperty(service, XMLFileLookupService.CONFIGURATION_FILE, "src/test/resources/test.xml");
+ runner.enableControllerService(service);
+ runner.assertValid(service);
+
+ final XMLFileLookupService lookupService =
+ (XMLFileLookupService) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("xml-file-lookup-service");
+
+ assertThat(lookupService, instanceOf(LookupService.class));
+
+ final Optional property1 = lookupService.lookup(Collections.singletonMap("key", "properties.property(0)"));
+ assertEquals(Optional.of("this is property 1"), property1);
+
+ final Optional property2 = lookupService.lookup(Collections.singletonMap("key", "properties.property(1)"));
+ assertEquals(Optional.of("this is property 2"), property2);
+
+ final Optional property3 = lookupService.lookup(Collections.singletonMap("key", "properties.property(2)[@value]"));
+ assertEquals(Optional.of("this is property 3"), property3);
+
+ final Optional property4 = lookupService.lookup(Collections.singletonMap("key", "properties.property(3)"));
+ assertEquals(EMPTY_STRING, property4);
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.csv b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.csv
new file mode 100644
index 0000000000..7a21f11194
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.csv
@@ -0,0 +1,3 @@
+"key","value","created_at"
+"property.1","this is property 1","2017-04-01"
+"property.2","this is property 2","2017-04-02"
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.properties b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.properties
new file mode 100644
index 0000000000..e805aa20bd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.properties
@@ -0,0 +1,2 @@
+property.1=this is property 1
+property.2=this is property 2
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.xml
new file mode 100644
index 0000000000..30c34d7baa
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/resources/test.xml
@@ -0,0 +1,8 @@
+
+
+
+ this is property 1
+ this is property 2
+
+
+