mirror of https://github.com/apache/nifi.git
NIFI-3404 Added LookupAttribute processor and lookup controller services
Signed-off-by: Joey Frazee <jfrazee@apache.org>
This commit is contained in:
parent
23cbc3b346
commit
46e2420d74
|
@ -1,13 +1,13 @@
|
|||
<?xml version="1.0"?>
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
|
||||
license agreements. See the NOTICE file distributed with this work for additional
|
||||
information regarding copyright ownership. The ASF licenses this file to
|
||||
You under the Apache License, Version 2.0 (the "License"); you may not use
|
||||
this file except in compliance with the License. You may obtain a copy of
|
||||
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
|
||||
by applicable law or agreed to in writing, software distributed under the
|
||||
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
|
||||
OF ANY KIND, either express or implied. See the License for the specific
|
||||
language governing permissions and limitations under the License. -->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
@ -255,6 +255,11 @@
|
|||
<artifactId>nifi-schema-registry-service-api</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-lookup-services</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.derby</groupId>
|
||||
<artifactId>derby</artifactId>
|
||||
|
|
|
@ -0,0 +1,252 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
|
||||
import org.apache.nifi.annotation.behavior.SideEffectFree;
|
||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.expression.AttributeExpression;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.lookup.LookupFailureException;
|
||||
import org.apache.nifi.lookup.LookupService;
|
||||
import org.apache.nifi.lookup.StringLookupService;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.ProcessorInitializationContext;
|
||||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
@EventDriven
|
||||
@SideEffectFree
|
||||
@SupportsBatching
|
||||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"lookup", "cache", "enrich", "join", "attributes", "Attribute Expression Language"})
|
||||
@CapabilityDescription("Lookup attributes from a lookup service")
|
||||
@DynamicProperty(name = "The name of the attribute to add to the FlowFile",
|
||||
value = "The name of the key or property to retrieve from the lookup service",
|
||||
supportsExpressionLanguage = true,
|
||||
description = "Adds a FlowFile attribute specified by the dynamic property's key with the value found in the lookup service using the the dynamic property's value")
|
||||
public class LookupAttribute extends AbstractProcessor {
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_SERVICE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-service")
|
||||
.displayName("Lookup Service")
|
||||
.description("The lookup service to use for attribute lookups")
|
||||
.identifiesControllerService(StringLookupService.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor INCLUDE_EMPTY_VALUES =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("include-empty-values")
|
||||
.displayName("Include Empty Values")
|
||||
.description("Include null or blank values for keys that are null or blank")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_MATCHED = new Relationship.Builder()
|
||||
.description("FlowFiles with matching lookups are routed to this relationship")
|
||||
.name("matched")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_UNMATCHED = new Relationship.Builder()
|
||||
.description("FlowFiles with missing lookups are routed to this relationship")
|
||||
.name("unmatched")
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_FAILURE = new Relationship.Builder()
|
||||
.description("FlowFiles with failing lookups are routed to this relationship")
|
||||
.name("failure")
|
||||
.build();
|
||||
|
||||
private List<PropertyDescriptor> descriptors;
|
||||
|
||||
private Set<Relationship> relationships;
|
||||
|
||||
private Map<PropertyDescriptor, PropertyValue> dynamicProperties;
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(validationContext));
|
||||
|
||||
final Set<PropertyDescriptor> dynamicProperties = validationContext.getProperties().keySet().stream()
|
||||
.filter(prop -> prop.isDynamic())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
if (dynamicProperties == null || dynamicProperties.size() < 1) {
|
||||
errors.add(new ValidationResult.Builder()
|
||||
.subject("User-Defined Properties")
|
||||
.valid(false)
|
||||
.explanation("At least one user-defined property must be specified.")
|
||||
.build());
|
||||
}
|
||||
|
||||
final Set<String> requiredKeys = validationContext.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class).getRequiredKeys();
|
||||
if (requiredKeys == null || requiredKeys.size() != 1) {
|
||||
errors.add(new ValidationResult.Builder()
|
||||
.subject(LOOKUP_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation("LookupAttribute requires a key-value lookup service supporting exactly one required key.")
|
||||
.build());
|
||||
}
|
||||
|
||||
return errors;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return descriptors;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
|
||||
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<Relationship> getRelationships() {
|
||||
return relationships;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ProcessorInitializationContext context) {
|
||||
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
|
||||
descriptors.add(LOOKUP_SERVICE);
|
||||
descriptors.add(INCLUDE_EMPTY_VALUES);
|
||||
this.descriptors = Collections.unmodifiableList(descriptors);
|
||||
|
||||
final Set<Relationship> relationships = new HashSet<Relationship>();
|
||||
relationships.add(REL_MATCHED);
|
||||
relationships.add(REL_UNMATCHED);
|
||||
relationships.add(REL_FAILURE);
|
||||
this.relationships = Collections.unmodifiableSet(relationships);
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void onScheduled(final ProcessContext context) {
|
||||
// Load up all the dynamic properties once for use later in onTrigger
|
||||
final Map<PropertyDescriptor, PropertyValue> dynamicProperties = new HashMap<>();
|
||||
for (final Map.Entry<PropertyDescriptor, String> e : context.getProperties().entrySet()) {
|
||||
final PropertyDescriptor descriptor = e.getKey();
|
||||
if (descriptor.isDynamic()) {
|
||||
final PropertyValue value = context.getProperty(descriptor);
|
||||
dynamicProperties.put(descriptor, value);
|
||||
}
|
||||
}
|
||||
this.dynamicProperties = Collections.unmodifiableMap(dynamicProperties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
final ComponentLog logger = getLogger();
|
||||
final LookupService lookupService = context.getProperty(LOOKUP_SERVICE).asControllerService(LookupService.class);
|
||||
final boolean includeEmptyValues = context.getProperty(INCLUDE_EMPTY_VALUES).asBoolean();
|
||||
for (FlowFile flowFile : session.get(50)) {
|
||||
try {
|
||||
onTrigger(logger, lookupService, includeEmptyValues, flowFile, session);
|
||||
} catch (final IOException e) {
|
||||
throw new ProcessException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void onTrigger(ComponentLog logger, LookupService lookupService,
|
||||
boolean includeEmptyValues, FlowFile flowFile, ProcessSession session)
|
||||
throws ProcessException, IOException {
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>(flowFile.getAttributes());
|
||||
|
||||
boolean matched = false;
|
||||
try {
|
||||
final Set<String> requiredKeys = lookupService.getRequiredKeys();
|
||||
if (requiredKeys == null || requiredKeys.size() != 1) {
|
||||
throw new ProcessException("LookupAttribute requires a key-value lookup service supporting exactly one required key, was: " +
|
||||
(requiredKeys == null ? "null" : String.valueOf(requiredKeys.size())));
|
||||
}
|
||||
|
||||
final String coordinateKey = requiredKeys.iterator().next();
|
||||
for (final Map.Entry<PropertyDescriptor, PropertyValue> e : dynamicProperties.entrySet()) {
|
||||
final PropertyValue lookupKeyExpression = e.getValue();
|
||||
final String lookupKey = lookupKeyExpression.evaluateAttributeExpressions(flowFile).getValue();
|
||||
final String attributeName = e.getKey().getName();
|
||||
final Optional<String> attributeValue = lookupService.lookup(Collections.singletonMap(coordinateKey, lookupKey));
|
||||
matched = putAttribute(attributeName, attributeValue, attributes, includeEmptyValues, logger) || matched;
|
||||
|
||||
if (!matched && logger.isDebugEnabled()) {
|
||||
logger.debug("No such value for key: {}", new Object[]{lookupKey});
|
||||
}
|
||||
}
|
||||
} catch (final LookupFailureException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
session.transfer(flowFile, REL_FAILURE);
|
||||
}
|
||||
|
||||
flowFile = session.putAllAttributes(flowFile, attributes);
|
||||
session.transfer(flowFile, matched ? REL_MATCHED : REL_UNMATCHED);
|
||||
}
|
||||
|
||||
private boolean putAttribute(final String attributeName, final Optional<String> attributeValue, final Map<String, String> attributes, final boolean includeEmptyValues, final ComponentLog logger) {
|
||||
boolean matched = false;
|
||||
if (attributeValue.isPresent() && StringUtils.isNotBlank(attributeValue.get())) {
|
||||
attributes.put(attributeName, attributeValue.get());
|
||||
matched = true;
|
||||
} else if (includeEmptyValues) {
|
||||
attributes.put(attributeName, attributeValue.isPresent() ? "" : "null");
|
||||
matched = true;
|
||||
}
|
||||
return matched;
|
||||
}
|
||||
|
||||
}
|
|
@ -59,6 +59,7 @@ org.apache.nifi.processors.standard.ListenUDP
|
|||
org.apache.nifi.processors.standard.ListSFTP
|
||||
org.apache.nifi.processors.standard.LogAttribute
|
||||
org.apache.nifi.processors.standard.LogMessage
|
||||
org.apache.nifi.processors.standard.LookupAttribute
|
||||
org.apache.nifi.processors.standard.LookupRecord
|
||||
org.apache.nifi.processors.standard.MergeContent
|
||||
org.apache.nifi.processors.standard.ModifyBytes
|
||||
|
@ -105,4 +106,4 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache
|
|||
org.apache.nifi.processors.standard.ListFTP
|
||||
org.apache.nifi.processors.standard.FetchFTP
|
||||
org.apache.nifi.processors.standard.UpdateCounter
|
||||
org.apache.nifi.processors.standard.UpdateRecord
|
||||
org.apache.nifi.processors.standard.UpdateRecord
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.standard;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.lookup.SimpleKeyValueLookupService;
|
||||
import org.apache.nifi.lookup.StringLookupService;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
|
||||
public class TestLookupAttribute {
|
||||
|
||||
@Test
|
||||
public void testKeyValueLookupAttribute() throws InitializationException {
|
||||
final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
|
||||
runner.addControllerService("simple-key-value-lookup-service", service);
|
||||
runner.setProperty(service, "key1", "value1");
|
||||
runner.setProperty(service, "key2", "value2");
|
||||
runner.setProperty(service, "key3", "value3");
|
||||
runner.setProperty(service, "key4", " ");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
|
||||
runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "true");
|
||||
runner.setProperty("foo", "key1");
|
||||
runner.setProperty("bar", "key2");
|
||||
runner.setProperty("baz", "${attr1}");
|
||||
runner.setProperty("qux", "key4");
|
||||
runner.setProperty("zab", "key5");
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("attr1", "key3");
|
||||
|
||||
runner.enqueue("some content".getBytes(), attributes);
|
||||
runner.run(1, false);
|
||||
runner.assertAllFlowFilesTransferred(LookupAttribute.REL_MATCHED, 1);
|
||||
|
||||
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(LookupAttribute.REL_MATCHED).get(0);
|
||||
|
||||
assertNotNull(flowFile);
|
||||
|
||||
flowFile.assertAttributeExists("foo");
|
||||
flowFile.assertAttributeExists("bar");
|
||||
flowFile.assertAttributeExists("baz");
|
||||
flowFile.assertAttributeExists("qux");
|
||||
flowFile.assertAttributeExists("zab");
|
||||
flowFile.assertAttributeNotExists("zar");
|
||||
|
||||
flowFile.assertAttributeEquals("foo", "value1");
|
||||
flowFile.assertAttributeEquals("bar", "value2");
|
||||
flowFile.assertAttributeEquals("baz", "value3");
|
||||
flowFile.assertAttributeEquals("qux", "");
|
||||
flowFile.assertAttributeEquals("zab", "null");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLookupAttributeUnmatched() throws InitializationException {
|
||||
final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
|
||||
runner.addControllerService("simple-key-value-lookup-service", service);
|
||||
runner.setProperty(service, "key1", "value1");
|
||||
runner.setProperty(service, "key2", "value2");
|
||||
runner.setProperty(service, "key3", "value3");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
|
||||
runner.setProperty(LookupAttribute.INCLUDE_EMPTY_VALUES, "false");
|
||||
runner.setProperty("baz", "${attr1}");
|
||||
runner.assertValid();
|
||||
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
attributes.put("attr1", "key4");
|
||||
|
||||
runner.enqueue("some content".getBytes(), attributes);
|
||||
runner.run(1, false);
|
||||
runner.assertAllFlowFilesTransferred(LookupAttribute.REL_UNMATCHED, 1);
|
||||
|
||||
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(LookupAttribute.REL_UNMATCHED).get(0);
|
||||
|
||||
assertNotNull(flowFile);
|
||||
|
||||
flowFile.assertAttributeExists("attr1");
|
||||
flowFile.assertAttributeNotExists("baz");
|
||||
flowFile.assertAttributeEquals("attr1", "key4");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidateInvalidLookupService() throws InitializationException {
|
||||
final InvalidLookupService service = new InvalidLookupService();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
|
||||
runner.addControllerService("invalid-lookup-service", service);
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "invalid-lookup-service");
|
||||
runner.setProperty("foo", "key1");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomValidateMissingDynamicProps() throws InitializationException {
|
||||
final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(new LookupAttribute());
|
||||
runner.addControllerService("simple-key-value-lookup-service", service);
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
runner.setProperty(LookupAttribute.LOOKUP_SERVICE, "simple-key-value-lookup-service");
|
||||
runner.assertNotValid();
|
||||
}
|
||||
|
||||
private static class InvalidLookupService extends AbstractControllerService implements StringLookupService {
|
||||
@Override
|
||||
public Optional<String> lookup(Map<String, String> coordinates) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRequiredKeys() {
|
||||
final Set<String> requiredKeys = new HashSet<>();
|
||||
requiredKeys.add("key1");
|
||||
requiredKeys.add("key2");
|
||||
return Collections.unmodifiableSet(requiredKeys);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -27,12 +27,15 @@ public interface StringLookupService extends LookupService<String> {
|
|||
*
|
||||
* @param coordinates the coordinates to lookup
|
||||
* @return an Optional String that represents the value for the given coordinates
|
||||
*
|
||||
* @throws LookupFailureException if unable to lookup a value for the given key
|
||||
*/
|
||||
@Override
|
||||
Optional<String> lookup(Map<String, String> coordinates);
|
||||
Optional<String> lookup(Map<String, String> coordinates) throws LookupFailureException;
|
||||
|
||||
@Override
|
||||
default Class<?> getValueType() {
|
||||
return String.class;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -10,6 +10,30 @@ Apache Software License v2
|
|||
|
||||
The following binary components are provided under the Apache Software License v2
|
||||
|
||||
(ASLv2) Apache Commons Configuration
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Configuration
|
||||
Copyright 2001-2017 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Apache Commons CSV
|
||||
The following NOTICE information applies:
|
||||
Apache Commons CSV
|
||||
Copyright 2005-2016 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Apache Commons BeanUtils
|
||||
The following NOTICE information applies:
|
||||
Apache Commons BeanUtils
|
||||
Copyright 2000-2016 The Apache Software Foundation
|
||||
|
||||
This product includes software developed at
|
||||
The Apache Software Foundation (http://www.apache.org/).
|
||||
|
||||
(ASLv2) Apache Commons Lang
|
||||
The following NOTICE information applies:
|
||||
Apache Commons Lang
|
||||
|
@ -22,7 +46,7 @@ The following binary components are provided under the Apache Software License v
|
|||
The following NOTICE information applies:
|
||||
Apache HttpClient
|
||||
Copyright 1999-2014 The Apache Software Foundation
|
||||
|
||||
|
||||
Apache HttpCore
|
||||
Copyright 2005-2014 The Apache Software Foundation
|
||||
|
||||
|
@ -60,7 +84,7 @@ The following binary components are provided under the Apache Software License v
|
|||
The following NOTICE information applies:
|
||||
GeoIP2 Java API
|
||||
This software is Copyright (c) 2013 by MaxMind, Inc.
|
||||
|
||||
|
||||
************************
|
||||
Creative Commons Attribution-ShareAlike 3.0
|
||||
************************
|
||||
|
@ -68,6 +92,3 @@ Creative Commons Attribution-ShareAlike 3.0
|
|||
The following binary components are provided under the Creative Commons Attribution-ShareAlike 3.0. See project link for details.
|
||||
|
||||
(CCAS 3.0) MaxMind DB (https://github.com/maxmind/MaxMind-DB)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with this
|
||||
work for additional information regarding copyright ownership. The ASF licenses
|
||||
this file to You under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
|
||||
required by applicable law or agreed to in writing, software distributed
|
||||
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
|
||||
<!-- Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with this
|
||||
work for additional information regarding copyright ownership. The ASF licenses
|
||||
this file to You under the Apache License, Version 2.0 (the "License"); you
|
||||
may not use this file except in compliance with the License. You may obtain
|
||||
a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
|
||||
required by applicable law or agreed to in writing, software distributed
|
||||
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
|
||||
OR CONDITIONS OF ANY KIND, either express or implied. See the License for
|
||||
the specific language governing permissions and limitations under the License. -->
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
|
@ -24,6 +24,10 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-processor-utils</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-lookup-service-api</artifactId>
|
||||
|
@ -36,6 +40,21 @@
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-configuration2</artifactId>
|
||||
<version>2.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-csv</artifactId>
|
||||
<version>1.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
<version>1.9.3</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.maxmind.geoip2</groupId>
|
||||
<artifactId>geoip2</artifactId>
|
||||
|
@ -47,5 +66,36 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-mock</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.rat</groupId>
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.children="append">
|
||||
<exclude>src/test/resources/test.csv</exclude>
|
||||
<exclude>src/test/resources/test.properties</exclude>
|
||||
<exclude>src/test/resources/test.xml</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.lookup.configuration2.CommonsConfigurationLookupService;
|
||||
|
||||
@Tags({"lookup", "cache", "enrich", "join", "properties", "reloadable", "key", "value"})
|
||||
@CapabilityDescription("A reloadable properties file-based lookup service")
|
||||
public class PropertiesFileLookupService extends CommonsConfigurationLookupService<PropertiesConfiguration> {
|
||||
|
||||
}
|
|
@ -0,0 +1,223 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.csv.CSVFormat;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.file.monitor.LastModifiedMonitor;
|
||||
import org.apache.nifi.util.file.monitor.SynchronousFileWatcher;
|
||||
|
||||
@Tags({"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value"})
|
||||
@CapabilityDescription("A reloadable properties file-based lookup service")
|
||||
public class SimpleCsvFileLookupService extends AbstractControllerService implements StringLookupService {
|
||||
|
||||
private static final String KEY = "key";
|
||||
|
||||
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
|
||||
|
||||
public static final PropertyDescriptor CSV_FILE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("csv-file")
|
||||
.displayName("CSV File")
|
||||
.description("A CSV file.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder()
|
||||
.name("CSV Format")
|
||||
.description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.")
|
||||
.expressionLanguageSupported(false)
|
||||
.allowableValues(Arrays.asList(CSVFormat.Predefined.values()).stream().map(e -> e.toString()).collect(Collectors.toSet()))
|
||||
.defaultValue(CSVFormat.Predefined.Default.toString())
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_KEY_COLUMN =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-key-column")
|
||||
.displayName("Lookup Key Column")
|
||||
.description("Lookup key column.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor LOOKUP_VALUE_COLUMN =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("lookup-value-column")
|
||||
.displayName("Lookup Value Column")
|
||||
.description("Lookup value column.")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor IGNORE_DUPLICATES =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("ignore-duplicates")
|
||||
.displayName("Ignore Duplicates")
|
||||
.description("Ignore duplicate keys for records in the CSV file.")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("true")
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private volatile ConcurrentMap<String, String> cache;
|
||||
|
||||
private volatile String csvFile;
|
||||
|
||||
private volatile CSVFormat csvFormat;
|
||||
|
||||
private volatile String lookupKeyColumn;
|
||||
|
||||
private volatile String lookupValueColumn;
|
||||
|
||||
private volatile boolean ignoreDuplicates;
|
||||
|
||||
private volatile SynchronousFileWatcher watcher;
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
private void loadCache() throws IllegalStateException, IOException {
|
||||
if (lock.tryLock()) {
|
||||
try {
|
||||
final ComponentLog logger = getLogger();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Loading lookup table from file: " + csvFile);
|
||||
}
|
||||
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
final FileReader reader = new FileReader(csvFile);
|
||||
final Iterable<CSVRecord> records = csvFormat.withFirstRecordAsHeader().parse(reader);
|
||||
for (final CSVRecord record : records) {
|
||||
final String key = record.get(lookupKeyColumn);
|
||||
final String value = record.get(lookupValueColumn);
|
||||
if (StringUtils.isBlank(key)) {
|
||||
throw new IllegalStateException("Empty lookup key encountered in: " + csvFile);
|
||||
} else if (!ignoreDuplicates && properties.containsKey(key)) {
|
||||
throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + csvFile);
|
||||
} else if (ignoreDuplicates && properties.containsKey(key)) {
|
||||
logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, csvFile});
|
||||
}
|
||||
properties.put(key, value);
|
||||
}
|
||||
|
||||
this.cache = new ConcurrentHashMap<>(properties);
|
||||
|
||||
if (cache.isEmpty()) {
|
||||
logger.warn("Lookup table is empty after reading file: " + csvFile);
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(CSV_FILE);
|
||||
properties.add(CSV_FORMAT);
|
||||
properties.add(LOOKUP_KEY_COLUMN);
|
||||
properties.add(LOOKUP_VALUE_COLUMN);
|
||||
properties.add(IGNORE_DUPLICATES);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException, FileNotFoundException {
|
||||
this.csvFile = context.getProperty(CSV_FILE).getValue();
|
||||
this.csvFormat = CSVFormat.Predefined.valueOf(context.getProperty(CSV_FORMAT).getValue()).getFormat();
|
||||
this.lookupKeyColumn = context.getProperty(LOOKUP_KEY_COLUMN).getValue();
|
||||
this.lookupValueColumn = context.getProperty(LOOKUP_VALUE_COLUMN).getValue();
|
||||
this.ignoreDuplicates = context.getProperty(IGNORE_DUPLICATES).asBoolean();
|
||||
this.watcher = new SynchronousFileWatcher(Paths.get(csvFile), new LastModifiedMonitor(), 30000L);
|
||||
try {
|
||||
loadCache();
|
||||
} catch (final IllegalStateException e) {
|
||||
throw new InitializationException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> lookup(final Map<String, String> coordinates) throws LookupFailureException {
|
||||
if (coordinates == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
final String key = coordinates.get(KEY);
|
||||
if (StringUtils.isBlank(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try {
|
||||
if (watcher != null && watcher.checkAndReset()) {
|
||||
loadCache();
|
||||
}
|
||||
} catch (final IllegalStateException | IOException e) {
|
||||
throw new LookupFailureException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
return Optional.ofNullable(cache.get(key));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRequiredKeys() {
|
||||
return REQUIRED_KEYS;
|
||||
}
|
||||
|
||||
}
|
|
@ -47,6 +47,7 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple
|
|||
.required(false)
|
||||
.dynamic(true)
|
||||
.addValidator(Validator.VALID)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
@ -74,4 +75,5 @@ public class SimpleKeyValueLookupService extends AbstractControllerService imple
|
|||
public Set<String> getRequiredKeys() {
|
||||
return REQUIRED_KEYS;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import org.apache.commons.configuration2.XMLConfiguration;
|
||||
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.lookup.configuration2.CommonsConfigurationLookupService;
|
||||
|
||||
@Tags({"lookup", "cache", "enrich", "join", "xml", "reloadable", "key", "value"})
|
||||
@CapabilityDescription("A reloadable properties file-based lookup service")
|
||||
public class XMLFileLookupService extends CommonsConfigurationLookupService<XMLConfiguration> {
|
||||
|
||||
}
|
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup.configuration2;
|
||||
|
||||
import java.io.File;
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.configuration2.Configuration;
|
||||
import org.apache.commons.configuration2.FileBasedConfiguration;
|
||||
import org.apache.commons.configuration2.builder.ConfigurationBuilderEvent;
|
||||
import org.apache.commons.configuration2.builder.ReloadingFileBasedConfigurationBuilder;
|
||||
import org.apache.commons.configuration2.builder.fluent.FileBasedBuilderParameters;
|
||||
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||
import org.apache.commons.configuration2.event.EventListener;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.AbstractControllerService;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.lookup.StringLookupService;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
|
||||
/**
|
||||
* This abstract class defines a generic {@link LookupService} backed by an
|
||||
* Apache Commons Configuration {@link FileBasedConfiguration}.
|
||||
*
|
||||
*/
|
||||
public abstract class CommonsConfigurationLookupService<T extends FileBasedConfiguration> extends AbstractControllerService implements StringLookupService {
|
||||
|
||||
private static final String KEY = "key";
|
||||
|
||||
private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of(KEY).collect(Collectors.toSet()));
|
||||
|
||||
public static final PropertyDescriptor CONFIGURATION_FILE =
|
||||
new PropertyDescriptor.Builder()
|
||||
.name("configuration-file")
|
||||
.displayName("Configuration File")
|
||||
.description("A configuration file")
|
||||
.required(true)
|
||||
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||
.expressionLanguageSupported(true)
|
||||
.build();
|
||||
|
||||
private final Class<T> resultClass = (Class<T>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
|
||||
|
||||
private List<PropertyDescriptor> properties;
|
||||
|
||||
private volatile ReloadingFileBasedConfigurationBuilder<T> builder;
|
||||
|
||||
private Configuration getConfiguration() {
|
||||
try {
|
||||
if (builder != null) {
|
||||
return builder.getConfiguration();
|
||||
}
|
||||
} catch (final ConfigurationException e) {
|
||||
// TODO: Need to fail starting the service if this happens
|
||||
getLogger().error(e.getMessage(), e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
|
||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(CONFIGURATION_FILE);
|
||||
this.properties = Collections.unmodifiableList(properties);
|
||||
}
|
||||
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) throws InitializationException {
|
||||
final String config = context.getProperty(CONFIGURATION_FILE).getValue();
|
||||
final FileBasedBuilderParameters params = new Parameters().fileBased().setFile(new File(config));
|
||||
this.builder = new ReloadingFileBasedConfigurationBuilder<>(resultClass).configure(params);
|
||||
builder.addEventListener(ConfigurationBuilderEvent.CONFIGURATION_REQUEST,
|
||||
new EventListener<ConfigurationBuilderEvent>() {
|
||||
@Override
|
||||
public void onEvent(ConfigurationBuilderEvent event) {
|
||||
if (builder.getReloadingController().checkForReloading(null)) {
|
||||
getLogger().debug("Reloading " + config);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<String> lookup(final Map<String, String> coordinates) {
|
||||
if (coordinates == null) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
final String key = coordinates.get(KEY);
|
||||
if (StringUtils.isBlank(key)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
final Configuration config = getConfiguration();
|
||||
if (config != null) {
|
||||
final Object value = config.getProperty(key);
|
||||
if (value != null) {
|
||||
return Optional.of(String.valueOf(value));
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getRequiredKeys() {
|
||||
return REQUIRED_KEYS;
|
||||
}
|
||||
|
||||
}
|
|
@ -12,6 +12,8 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.nifi.lookup.maxmind.IPLookupService
|
||||
org.apache.nifi.lookup.SimpleKeyValueLookupService
|
||||
org.apache.nifi.lookup.PropertiesFileLookupService
|
||||
org.apache.nifi.lookup.SimpleKeyValueLookupService
|
||||
org.apache.nifi.lookup.SimpleCsvFileLookupService
|
||||
org.apache.nifi.lookup.XMLFileLookupService
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
public class TestProcessor extends AbstractProcessor {
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(new PropertyDescriptor.Builder()
|
||||
.name("LookupService test processor")
|
||||
.description("LookupService test processor")
|
||||
.identifiesControllerService(LookupService.class)
|
||||
.required(true)
|
||||
.build());
|
||||
return properties;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class TestPropertiesFileLookupService {
|
||||
|
||||
final static Optional<String> EMPTY_STRING = Optional.empty();
|
||||
|
||||
@Test
|
||||
public void testPropertiesFileLookupService() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final PropertiesFileLookupService service = new PropertiesFileLookupService();
|
||||
|
||||
runner.addControllerService("properties-file-lookup-service", service);
|
||||
runner.setProperty(service, PropertiesFileLookupService.CONFIGURATION_FILE, "src/test/resources/test.properties");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
final PropertiesFileLookupService lookupService =
|
||||
(PropertiesFileLookupService) runner.getProcessContext()
|
||||
.getControllerServiceLookup()
|
||||
.getControllerService("properties-file-lookup-service");
|
||||
|
||||
assertThat(lookupService, instanceOf(LookupService.class));
|
||||
|
||||
final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "property.1"));
|
||||
assertEquals(Optional.of("this is property 1"), property1);
|
||||
|
||||
final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "property.2"));
|
||||
assertEquals(Optional.of("this is property 2"), property2);
|
||||
|
||||
final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "property.3"));
|
||||
assertEquals(EMPTY_STRING, property3);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class TestSimpleCsvFileLookupService {
|
||||
|
||||
final static Optional<String> EMPTY_STRING = Optional.empty();
|
||||
|
||||
@Test
|
||||
public void testSimpleCsvFileLookupService() throws InitializationException, IOException, LookupFailureException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final SimpleCsvFileLookupService service = new SimpleCsvFileLookupService();
|
||||
|
||||
runner.addControllerService("csv-file-lookup-service", service);
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.CSV_FILE, "src/test/resources/test.csv");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.CSV_FORMAT, "RFC4180");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_KEY_COLUMN, "key");
|
||||
runner.setProperty(service, SimpleCsvFileLookupService.LOOKUP_VALUE_COLUMN, "value");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
final SimpleCsvFileLookupService lookupService =
|
||||
(SimpleCsvFileLookupService) runner.getProcessContext()
|
||||
.getControllerServiceLookup()
|
||||
.getControllerService("csv-file-lookup-service");
|
||||
|
||||
assertThat(lookupService, instanceOf(LookupService.class));
|
||||
|
||||
final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "property.1"));
|
||||
assertEquals(Optional.of("this is property 1"), property1);
|
||||
|
||||
final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "property.2"));
|
||||
assertEquals(Optional.of("this is property 2"), property2);
|
||||
|
||||
final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "property.3"));
|
||||
assertEquals(EMPTY_STRING, property3);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class TestSimpleKeyValueLookupService {
|
||||
|
||||
final static Optional<String> EMPTY_STRING = Optional.empty();
|
||||
|
||||
@Test
|
||||
public void testSimpleKeyValueLookupService() throws InitializationException {
|
||||
final SimpleKeyValueLookupService service = new SimpleKeyValueLookupService();
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
runner.addControllerService("simple-key-value-lookup-service", service);
|
||||
runner.setProperty(service, "key1", "value1");
|
||||
runner.setProperty(service, "key2", "value2");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
assertThat(service, instanceOf(LookupService.class));
|
||||
|
||||
final Optional<String> get1 = service.lookup(Collections.singletonMap("key", "key1"));
|
||||
assertEquals(Optional.of("value1"), get1);
|
||||
|
||||
final Optional<String> get2 = service.lookup(Collections.singletonMap("key", "key2"));
|
||||
assertEquals(Optional.of("value2"), get2);
|
||||
|
||||
final Optional<String> get3 = service.lookup(Collections.singletonMap("key", "key3"));
|
||||
assertEquals(EMPTY_STRING, get3);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class TestXMLFileLookupService {
|
||||
|
||||
final static Optional<String> EMPTY_STRING = Optional.empty();
|
||||
|
||||
@Test
|
||||
public void testXMLFileLookupService() throws InitializationException {
|
||||
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
final XMLFileLookupService service = new XMLFileLookupService();
|
||||
|
||||
runner.addControllerService("xml-file-lookup-service", service);
|
||||
runner.setProperty(service, XMLFileLookupService.CONFIGURATION_FILE, "src/test/resources/test.xml");
|
||||
runner.enableControllerService(service);
|
||||
runner.assertValid(service);
|
||||
|
||||
final XMLFileLookupService lookupService =
|
||||
(XMLFileLookupService) runner.getProcessContext()
|
||||
.getControllerServiceLookup()
|
||||
.getControllerService("xml-file-lookup-service");
|
||||
|
||||
assertThat(lookupService, instanceOf(LookupService.class));
|
||||
|
||||
final Optional<String> property1 = lookupService.lookup(Collections.singletonMap("key", "properties.property(0)"));
|
||||
assertEquals(Optional.of("this is property 1"), property1);
|
||||
|
||||
final Optional<String> property2 = lookupService.lookup(Collections.singletonMap("key", "properties.property(1)"));
|
||||
assertEquals(Optional.of("this is property 2"), property2);
|
||||
|
||||
final Optional<String> property3 = lookupService.lookup(Collections.singletonMap("key", "properties.property(2)[@value]"));
|
||||
assertEquals(Optional.of("this is property 3"), property3);
|
||||
|
||||
final Optional<String> property4 = lookupService.lookup(Collections.singletonMap("key", "properties.property(3)"));
|
||||
assertEquals(EMPTY_STRING, property4);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
"key","value","created_at"
|
||||
"property.1","this is property 1","2017-04-01"
|
||||
"property.2","this is property 2","2017-04-02"
|
|
|
@ -0,0 +1,2 @@
|
|||
property.1=this is property 1
|
||||
property.2=this is property 2
|
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" ?>
|
||||
<test>
|
||||
<properties>
|
||||
<property>this is property 1</property>
|
||||
<property>this is property 2</property>
|
||||
<property value="this is property 3" />
|
||||
</properties>
|
||||
</test>
|
Loading…
Reference in New Issue