diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml index 29b6435d8e..ad233bb89c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/pom.xml @@ -32,6 +32,10 @@ org.apache.nifi nifi-lookup-service-api + + org.apache.nifi + nifi-distributed-cache-client-service-api + org.apache.nifi nifi-utils diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java new file mode 100644 index 0000000000..3b2ffa233d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/java/org/apache/nifi/lookup/DistributedMapCacheLookupService.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.lookup; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.distributed.cache.client.exception.DeserializationException; +import org.apache.nifi.distributed.cache.client.exception.SerializationException; +import org.apache.nifi.expression.ExpressionLanguageScope; + +@Tags({"lookup", "enrich", "key", "value", "map", "cache", "distributed"}) +@CapabilityDescription("Allows to choose a distributed map cache client to retrieve the value associated to a key. " + + "The coordinates that are passed to the lookup must contain the key 'key'.") +public class DistributedMapCacheLookupService extends AbstractControllerService implements StringLookupService { + + private static final List STANDARD_CHARSETS = Arrays.asList( + StandardCharsets.UTF_8, + StandardCharsets.US_ASCII, + StandardCharsets.ISO_8859_1, + StandardCharsets.UTF_16, + StandardCharsets.UTF_16LE, + StandardCharsets.UTF_16BE); + + private static final String KEY = "key"; + private static final Set REQUIRED_KEYS = Stream.of(KEY).collect(Collectors.toSet()); + + private volatile DistributedMapCacheClient cache; + private volatile static Charset charset; + private final Serializer keySerializer = new StringSerializer(); + private final Deserializer valueDeserializer = new StringDeserializer(); + + public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder() + .name("distributed-map-cache-service") + .displayName("Distributed Cache Service") + .description("The Controller Service that is used to get the cached values.") + .required(true) + .identifiesControllerService(DistributedMapCacheClient.class) + .build(); + + public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() + .name("character-encoding") + .displayName("Character Encoding") + .description("Specifies a character encoding to use.") + .required(true) + .allowableValues(getStandardCharsetNames()) + .defaultValue(StandardCharsets.UTF_8.displayName()) + .build(); + + private static Set getStandardCharsetNames() { + return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet()); + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .dynamic(true) + .addValidator(Validator.VALID) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); + charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).getValue()); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List descriptors = new ArrayList<>(); + descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE); + descriptors.add(CHARACTER_ENCODING); + return descriptors; + } + + @Override + public Optional lookup(final Map coordinates) { + if (coordinates == null) { + return Optional.empty(); + } + + final String key = coordinates.get(KEY).toString(); + if (key == null) { + return Optional.empty(); + } + + try { + return Optional.ofNullable(cache.get(key, keySerializer, valueDeserializer)); + } catch (IOException e) { + getLogger().error("Error while trying to get the value from distributed map cache with key = " + key, e); + return Optional.empty(); + } + } + + @Override + public Set getRequiredKeys() { + return REQUIRED_KEYS; + } + + public static class StringDeserializer implements Deserializer { + @Override + public String deserialize(final byte[] input) throws DeserializationException, IOException { + if (input == null || input.length == 0) { + return null; + } + return new String(input, 0, input.length, charset); + } + } + + public static class StringSerializer implements Serializer { + @Override + public void serialize(final String value, final OutputStream out) throws SerializationException, IOException { + out.write(value.getBytes(charset)); + } + } + +} diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 34395e8720..631fdaa6ca 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -19,3 +19,4 @@ org.apache.nifi.lookup.RestLookupService org.apache.nifi.lookup.SimpleKeyValueLookupService org.apache.nifi.lookup.SimpleCsvFileLookupService org.apache.nifi.lookup.XMLFileLookupService +org.apache.nifi.lookup.DistributedMapCacheLookupService diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java new file mode 100644 index 0000000000..6824107fff --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.lookup; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestDistributedMapCacheLookupService { + + final static Optional EMPTY_STRING = Optional.empty(); + + @Test + public void testDistributedMapCacheLookupService() throws InitializationException { + final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); + final DistributedMapCacheLookupService service = new DistributedMapCacheLookupService(); + final DistributedMapCacheClient client = new DistributedMapCacheClientImpl(); + + runner.addControllerService("client", client); + runner.addControllerService("lookup-service", service); + runner.setProperty(service, DistributedMapCacheLookupService.PROP_DISTRIBUTED_CACHE_SERVICE, "client"); + + runner.enableControllerService(client); + runner.enableControllerService(service); + + runner.assertValid(service); + + assertThat(service, instanceOf(LookupService.class)); + + final Optional get = service.lookup(Collections.singletonMap("key", "myKey")); + assertEquals(Optional.of("myValue"), get); + + final Optional absent = service.lookup(Collections.singletonMap("key", "absentKey")); + assertEquals(EMPTY_STRING, absent); + } + + static final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { + + private Map map = new HashMap(); + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + map.put("myKey", "myValue"); + } + + @Override + public void close() throws IOException { + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + } + + @Override + protected java.util.List getSupportedPropertyDescriptors() { + return new ArrayList<>(); + } + + @Override + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, + final Deserializer valueDeserializer) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + @SuppressWarnings("unchecked") + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + return (V) map.get(key); + } + + @Override + public boolean remove(final K key, final Serializer serializer) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long removeByPattern(String regex) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + throw new UnsupportedOperationException("not implemented"); + } + } + +}