NIFI-1382 Created FetchDistributedMapCache processor

This commit is contained in:
jpercivall 2016-01-28 12:21:51 -05:00
parent 73c0637c25
commit be27c07c7f
4 changed files with 453 additions and 1 deletions

View File

@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "fetch", "distributed"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Computes a cache key from FlowFile attributes, for each incoming FlowFile, and fetches the value from the Distributed Map Cache associated "
+ "with that key. The incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. If there is no value stored "
+ "under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
+ "memory before placing it in it's destination. This could be potentially problematic if the cached value is very large.")
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Cache Value In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.PutDistributedMapCache"})
public class FetchDistributedMapCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
.name("Distributed Cache Service")
.description("The Controller Service that is used to get the cached values.")
.required(true)
.identifiesControllerService(DistributedMapCacheClient.class)
.build();
public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
+ "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.defaultValue("${hash.value}")
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Put Cache Value In Attribute")
.description("If set, the cache value received will be put into an attribute of the FlowFile instead of a the content of the"
+ "FlowFile. The attribute key to put to is determined by evaluating value of this property.")
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor PROP_PUT_ATTRIBUTE_MAX_LENGTH = new PropertyDescriptor.Builder()
.name("Max Length To Put In Attribute")
.description("If routing the cache value to an attribute of the FlowFile (by setting the \"Put Cache Value in attribute\" "
+ "property), the number of characters put to the attribute value will be at most this amount. This is important because "
+ "attributes are held in memory and large attributes will quickly cause out of memory issues. If the output goes "
+ "longer than this value, it will be truncated to fit. Consider making this smaller if able.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("256")
.build();
public static final PropertyDescriptor PROP_CHARACTER_SET = new PropertyDescriptor.Builder()
.name("Character Set")
.description("The Character Set in which the cached value is encoded. This will only be used when routing to an attribute.")
.required(false)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("If the cache was successfully communicated with it will be routed to this relationship")
.build();
public static final Relationship REL_NOT_FOUND = new Relationship.Builder()
.name("not-found")
.description("If a FlowFile's Cache Entry Identifier was not found in the cache, it will be routed to this relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("If unable to communicate with the cache or if the cache entry is evaluated to be blank, the FlowFile will be penalized and routed to this relationship")
.build();
private final Set<Relationship> relationships;
private final Serializer<String> keySerializer = new StringSerializer();
private final Deserializer<byte[]> valueDeserializer = new CacheValueDeserializer();
public FetchDistributedMapCache() {
final Set<Relationship> rels = new HashSet<>();
rels.add(REL_SUCCESS);
rels.add(REL_NOT_FOUND);
rels.add(REL_FAILURE);
relationships = Collections.unmodifiableSet(rels);
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_CACHE_ENTRY_IDENTIFIER);
descriptors.add(PROP_DISTRIBUTED_CACHE_SERVICE);
descriptors.add(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE);
descriptors.add(PROP_PUT_ATTRIBUTE_MAX_LENGTH);
descriptors.add(PROP_CHARACTER_SET);
return descriptors;
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final ProcessorLog logger = getLogger();
final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
try {
final byte[] cacheValue = cache.get(cacheKey, keySerializer, valueDeserializer);
if(cacheValue==null){
session.transfer(flowFile, REL_NOT_FOUND);
logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
} else {
boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet();
if(putInAttribute){
String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
String attributeValue = new String(cacheValue,context.getProperty(PROP_CHARACTER_SET).getValue());
int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
if(maxLength < attributeValue.length()){
attributeValue = attributeValue.substring(0,maxLength);
}
flowFile = session.putAttribute(flowFile, attributeName, attributeValue);
} else {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(cacheValue);
}
});
}
session.transfer(flowFile, REL_SUCCESS);
if(putInAttribute){
logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
}else {
logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
}
}
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
logger.error("Unable to communicate with cache when processing {} due to {}", new Object[]{flowFile, e});
}
}
public static class CacheValueDeserializer implements Deserializer<byte[]> {
@Override
public byte[] deserialize(final byte[] input) throws DeserializationException, IOException {
if (input == null || input.length == 0) {
return null;
}
return input;
}
}
public static class StringSerializer implements Serializer<String> {
@Override
public void serialize(final String value, final OutputStream out) throws SerializationException, IOException {
out.write(value.getBytes(StandardCharsets.UTF_8));
}
}
}

View File

@ -62,7 +62,8 @@ import org.apache.nifi.processor.util.StandardValidators;
"'keep original' the entry is not replaced.'")
@WritesAttribute(attribute = "cached", description = "All FlowFiles will have an attribute 'cached'. The value of this " +
"attribute is true, is the FlowFile is cached, otherwise false.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer"})
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.FetchDistributedMapCache"})
public class PutDistributedMapCache extends AbstractProcessor {
public static final String CACHED_ATTRIBUTE_NAME = "cached";

View File

@ -81,3 +81,4 @@ org.apache.nifi.processors.standard.TransformXml
org.apache.nifi.processors.standard.UnpackContent
org.apache.nifi.processors.standard.ValidateXml
org.apache.nifi.processors.standard.ExecuteSQL
org.apache.nifi.processors.standard.FetchDistributedMapCache

View File

@ -0,0 +1,216 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class TestFetchDistributedMapCache {
private TestRunner runner;
private MockCacheClient service;
@Before
public void setup() throws InitializationException {
runner = TestRunners.newTestRunner(FetchDistributedMapCache.class);
service = new MockCacheClient();
runner.addControllerService("service", service);
runner.enableControllerService(service);
runner.setProperty(FetchDistributedMapCache.PROP_DISTRIBUTED_CACHE_SERVICE, "service");
}
@Test
public void testNoCacheKey() throws InitializationException {
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
final Map<String, String> props = new HashMap<>();
props.put("cacheKeyAttribute", "1");
runner.enqueue(new byte[] {},props);
runner.run();
// no cache key attribute
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_NOT_FOUND, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_NOT_FOUND, 1);
runner.clearTransferState();
}
@Test
public void testFailingCacheService() throws InitializationException, IOException {
service.setFailOnCalls(true);
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
final Map<String, String> props = new HashMap<>();
props.put("cacheKeyAttribute", "2");
runner.enqueue(new byte[] {}, props);
runner.run();
//Expect the processor to receive an IO exception from the cache service and route to failure
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_FAILURE, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_FAILURE, 1);
service.setFailOnCalls(false);
}
@Test
public void testSingleFlowFile() throws InitializationException, IOException {
service.put("key","value", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
final Map<String, String> props = new HashMap<>();
props.put("cacheKeyAttribute", "key");
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), props);
runner.run();
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0);
outputFlowFile.assertContentEquals("value");
runner.clearTransferState();
}
@Test
public void testSingleFlowFileToAttribute() throws InitializationException, IOException {
service.put("key","value", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
final Map<String, String> props = new HashMap<>();
props.put("cacheKeyAttribute", "key");
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), props);
runner.run();
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0);
outputFlowFile.assertAttributeEquals("test","value");
runner.clearTransferState();
}
@Test
public void testToAttributeTooLong() throws InitializationException, IOException {
service.put("key","value", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
runner.setProperty(FetchDistributedMapCache.PROP_PUT_ATTRIBUTE_MAX_LENGTH, "3");
final Map<String, String> props = new HashMap<>();
props.put("cacheKeyAttribute", "key");
String flowFileContent = "content";
runner.enqueue(flowFileContent.getBytes("UTF-8"), props);
runner.run();
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0);
outputFlowFile.assertAttributeEquals("test","val");
runner.clearTransferState();
}
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;
public void setFailOnCalls(boolean failOnCalls){
this.failOnCalls = failOnCalls;
}
private void verifyNotFail() throws IOException {
if (failOnCalls) {
throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
}
}
@Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
final Object retValue = values.putIfAbsent(key, value);
return (retValue == null);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
return (V) values.putIfAbsent(key, value);
}
@Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
verifyNotFail();
return values.containsKey(key);
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
verifyNotFail();
values.put(key, value);
}
@Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail();
if(values.containsKey(key)) {
return (V) ((String) values.get(key)).getBytes();
} else {
return null;
}
}
@Override
public void close() throws IOException {
}
@Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
verifyNotFail();
values.remove(key);
return true;
}
}
}