NIFI-4589: Allow multiple keys in FetchDistributedMapCache, add subMap operation to API

Signed-off-by: Mike Moser <mosermw@apache.org>

This closes #2260.
This commit is contained in:
Matthew Burgess 2017-11-09 10:31:16 -05:00 committed by Mike Moser
parent 412b3fbbe2
commit 16a23f5a0f
9 changed files with 306 additions and 41 deletions

View File

@ -26,6 +26,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
@ -40,28 +43,35 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@EventDriven
@SupportsBatching
@Tags({"map", "cache", "fetch", "distributed"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Computes a cache key from FlowFile attributes, for each incoming FlowFile, and fetches the value from the Distributed Map Cache associated "
+ "with that key. The incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. If there is no value stored "
+ "under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
@CapabilityDescription("Computes cache key(s) from FlowFile attributes, for each incoming FlowFile, and fetches the value(s) from the Distributed Map Cache associated "
+ "with each key. If configured without a destination attribute, the incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. "
+ "If there is no value stored under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into "
+ "memory before placing it in it's destination. This could be potentially problematic if the cached value is very large.")
@WritesAttribute(attribute = "user-defined", description = "If the 'Put Cache Value In Attribute' property is set then whatever it is set to "
+ "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache.")
+ "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache. If multiple cache entry identifiers are selected, "
+ "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier. For example, if "
+ "the Cache Entry Identifier property is set to 'id,name', and the user-defined property is named 'fetched', then two attributes will be written, "
+ "fetched.id and fetched.name, containing their respective values.")
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer",
"org.apache.nifi.processors.standard.PutDistributedMapCache"})
public class FetchDistributedMapCache extends AbstractProcessor {
@ -75,8 +85,10 @@ public class FetchDistributedMapCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder()
.name("Cache Entry Identifier")
.description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated "
+ "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached")
.description("A comma-delimited list of FlowFile attributes, or the results of Attribute Expression Language statements, which will be evaluated "
+ "against a FlowFile in order to determine the value(s) used to identify duplicates; it is these values that are cached. NOTE: Only a single "
+ "Cache Entry Identifier is allowed unless Put Cache Value In Attribute is specified. Multiple cache lookups are only supported when the destination "
+ "is a set of attributes (see the documentation for 'Put Cache Value In Attribute' for more details including naming convention.")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
.defaultValue("${hash.value}")
@ -86,7 +98,8 @@ public class FetchDistributedMapCache extends AbstractProcessor {
public static final PropertyDescriptor PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Put Cache Value In Attribute")
.description("If set, the cache value received will be put into an attribute of the FlowFile instead of a the content of the"
+ "FlowFile. The attribute key to put to is determined by evaluating value of this property.")
+ "FlowFile. The attribute key to put to is determined by evaluating value of this property. If multiple Cache Entry Identifiers are selected, "
+ "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier.")
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING))
.expressionLanguageSupported(true)
.build();
@ -150,6 +163,35 @@ public class FetchDistributedMapCache extends AbstractProcessor {
return relationships;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
List<ValidationResult> results = new ArrayList<>(super.customValidate(validationContext));
PropertyValue cacheEntryIdentifier = validationContext.getProperty(PROP_CACHE_ENTRY_IDENTIFIER);
boolean elPresent = false;
try {
elPresent = cacheEntryIdentifier.isExpressionLanguagePresent();
} catch (NullPointerException npe) {
// Unfortunate workaround to a mock framework bug (NIFI-4590)
}
if (elPresent) {
// This doesn't do a full job of validating against the requirement that Put Cache Value In Attribute must be set if multiple
// Cache Entry Identifiers are supplied (if Expression Language is used). The user could conceivably have a comma-separated list of EL statements,
// or a single EL statement with commas inside it but that evaluates to a single item.
results.add(new ValidationResult.Builder().valid(true).explanation("Contains Expression Language").build());
} else {
if (!validationContext.getProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet()) {
String identifierString = cacheEntryIdentifier.getValue();
if (identifierString.contains(",")) {
results.add(new ValidationResult.Builder().valid(false)
.explanation("Multiple Cache Entry Identifiers specified without Put Cache Value In Attribute set").build());
}
}
}
return results;
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
@ -159,25 +201,51 @@ public class FetchDistributedMapCache extends AbstractProcessor {
final ComponentLog logger = getLogger();
final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue();
// This block retains the previous behavior when only one Cache Entry Identifier was allowed, so as not to change the expected error message
if (StringUtils.isBlank(cacheKey)) {
logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
List<String> cacheKeys = Arrays.stream(cacheKey.split(",")).filter(path -> !StringUtils.isEmpty(path)).map(String::trim).collect(Collectors.toList());
for (int i = 0; i < cacheKeys.size(); i++) {
if (StringUtils.isBlank(cacheKeys.get(i))) {
// Log first missing identifier, route to failure, and return
logger.error("FlowFile {} has no attribute for Cache Entry Identifier in position {}", new Object[]{flowFile, i});
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
}
}
final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class);
try {
final byte[] cacheValue = cache.get(cacheKey, keySerializer, valueDeserializer);
final Map<String, byte[]> cacheValues;
final boolean singleKey = cacheKeys.size() == 1;
if (singleKey) {
cacheValues = new HashMap<>(1);
cacheValues.put(cacheKeys.get(0), cache.get(cacheKey, keySerializer, valueDeserializer));
} else {
cacheValues = cache.subMap(new HashSet<>(cacheKeys), keySerializer, valueDeserializer);
}
boolean notFound = false;
for(Map.Entry<String,byte[]> cacheValueEntry : cacheValues.entrySet()) {
final byte[] cacheValue = cacheValueEntry.getValue();
if (cacheValue == null) {
session.transfer(flowFile, REL_NOT_FOUND);
logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile});
notFound = true;
break;
} else {
boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet();
if (putInAttribute) {
String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue();
if (!singleKey) {
// Append key to attribute name if multiple keys
attributeName += "." + cacheValueEntry.getKey();
}
String attributeValue = new String(cacheValue, context.getProperty(PROP_CHARACTER_SET).getValue());
int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
@ -187,23 +255,26 @@ public class FetchDistributedMapCache extends AbstractProcessor {
flowFile = session.putAttribute(flowFile, attributeName, attributeValue);
} else if (cacheKeys.size() > 1) {
throw new IOException("Multiple Cache Value Identifiers specified without Put Cache Value In Attribute set");
} else {
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(cacheValue);
}
});
// Write single value to content
flowFile = session.write(flowFile, out -> out.write(cacheValue));
}
session.transfer(flowFile, REL_SUCCESS);
if (putInAttribute) {
logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile});
} else {
logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile});
}
}
}
// If the loop was exited because a cache entry was not found, route to REL_NOT_FOUND; otherwise route to REL_SUCCESS
if (notFound) {
session.transfer(flowFile, REL_NOT_FOUND);
} else {
session.transfer(flowFile, REL_SUCCESS);
}
} catch (final IOException e) {
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);

View File

@ -69,6 +69,19 @@ public class TestFetchDistributedMapCache {
runner.clearTransferState();
}
@Test
public void testNoCacheKeyValue() throws InitializationException {
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
runner.enqueue(new byte[] {});
runner.run();
// Cache key attribute evaluated to empty
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_FAILURE, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_FAILURE, 1);
runner.clearTransferState();
}
@Test
public void testFailingCacheService() throws InitializationException, IOException {
service.setFailOnCalls(true);
@ -151,6 +164,51 @@ public class TestFetchDistributedMapCache {
runner.clearTransferState();
}
@Test
public void testMultipleKeysToAttributes() throws InitializationException, IOException {
service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
service.put("key2","value2", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2");
// Not valid to set multiple keys without Put Cache Value In Attribute set
runner.assertNotValid();
runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
runner.assertValid();
final Map<String, String> props = new HashMap<>();
runner.enqueue(new byte[]{}, props);
runner.run();
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0);
outputFlowFile.assertAttributeEquals("test.key1","value1");
outputFlowFile.assertAttributeEquals("test.key2","value2");
}
@Test
public void testMultipleKeysOneNotFound() throws InitializationException, IOException {
service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer());
runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2");
// Not valid to set multiple keys without Put Cache Value In Attribute set
runner.assertNotValid();
runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test");
runner.assertValid();
final Map<String, String> props = new HashMap<>();
runner.enqueue(new byte[]{}, props);
runner.run();
runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_NOT_FOUND, 1);
runner.assertTransferCount(FetchDistributedMapCache.REL_NOT_FOUND, 1);
final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_NOT_FOUND).get(0);
outputFlowFile.assertAttributeEquals("test.key1","value1");
}
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false;

View File

@ -17,6 +17,9 @@
package org.apache.nifi.distributed.cache.client;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
@ -117,6 +120,32 @@ public interface DistributedMapCacheClient extends ControllerService {
*/
<K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
/**
* Returns the values in the cache for the given keys, if they exist;
* otherwise returns <code>null</code>
*
* @param <K> the key type
* @param <V> the value type
* @param keys a set of keys whose values to lookup in the map
* @param keySerializer key serializer
* @param valueDeserializer value serializer
*
* @return the value in the cache for the given key, if one exists;
* otherwise returns <code>null</code>
* @throws IOException ex
*/
default <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
// Default behavior is to iterate over the keys, calling get(key) and putting it into the results map
if (keys == null) {
return null;
}
Map<K, V> results = new HashMap<>(keys.size());
for (K key : keys) {
results.put(key, get(key, keySerializer, valueDeserializer));
}
return results;
}
/**
* Attempts to notify the server that we are finished communicating with it
* and cleans up resources

View File

@ -21,7 +21,10 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@ -204,6 +207,36 @@ public class DistributedMapCacheClientService extends AbstractControllerService
});
}
@Override
public <K, V> Map<K, V> subMap(Set<K> keys, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(session -> {
Map<K, V> response = new HashMap<>(keys.size());
try {
validateProtocolVersion(session, 3);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("subMap");
serialize(keys, keySerializer, dos);
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
for (K key : keys) {
final byte[] responseBuffer = readLengthDelimitedResponse(dis);
response.put(key, valueDeserializer.deserialize(responseBuffer));
}
} catch (UnsupportedOperationException uoe) {
// If the server doesn't support subMap, just emulate it with multiple calls to get()
for (K key : keys) {
response.put(key, get(key, keySerializer, valueDeserializer));
}
}
return response;
});
}
@Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
return withCommsSession(new CommsAction<Boolean>() {
@ -319,7 +352,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
session = createCommsSession(configContext);
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(3, 2, 1);
try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
session.setProtocolVersion(versionNegotiator.getVersion());
@ -368,6 +401,17 @@ public class DistributedMapCacheClientService extends AbstractControllerService
baos.writeTo(dos);
}
private <T> void serialize(final Set<T> values, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
// Write the number of elements to follow, then each element and its size
dos.writeInt(values.size());
for(T value : values) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
serializer.serialize(value, baos);
dos.writeInt(baos.size());
baos.writeTo(dos);
}
}
private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
if (closed) {
throw new IllegalStateException("Client is closed");

View File

@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
public interface MapCache {
@ -30,6 +31,8 @@ public interface MapCache {
ByteBuffer get(ByteBuffer key) throws IOException;
Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException;
ByteBuffer remove(ByteBuffer key) throws IOException;
Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException;

View File

@ -56,7 +56,7 @@ public class MapCacheServer extends AbstractCacheServer {
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(2, 1);
return new StandardVersionNegotiator(3, 2, 1);
}
@Override
@ -121,6 +121,23 @@ public class MapCacheServer extends AbstractCacheServer {
break;
}
case "subMap": {
final int numKeys = dis.readInt();
for(int i=0;i<numKeys;i++) {
final byte[] key = readValue(dis);
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
if (existingValue == null) {
// there was no existing value.
dos.writeInt(0);
} else {
// a value already existed.
final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
}
break;
}
case "remove": {
final byte[] key = readValue(dis);
final boolean removed = cache.remove(ByteBuffer.wrap(key)) != null;

View File

@ -110,6 +110,18 @@ public class PersistentMapCache implements MapCache {
return wrapped.get(key);
}
@Override
public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException {
if (keys == null) {
return null;
}
Map<ByteBuffer, ByteBuffer> results = new HashMap<>(keys.size());
for (ByteBuffer key : keys) {
results.put(key, wrapped.get(key));
}
return results;
}
@Override
public MapCacheRecord fetch(ByteBuffer key) throws IOException {
return wrapped.fetch(key);

View File

@ -60,7 +60,7 @@ public class SimpleMapCache implements MapCache {
@Override
public String toString() {
return "SimpleSetCache[service id=" + serviceIdentifier + "]";
return "SimpleMapCache[service id=" + serviceIdentifier + "]";
}
// don't need synchronized because this method is only called when the writeLock is held, and all
@ -170,6 +170,31 @@ public class SimpleMapCache implements MapCache {
}
}
@Override
public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException {
if (keys == null) {
return null;
}
Map<ByteBuffer, ByteBuffer> results = new HashMap<>(keys.size());
readLock.lock();
try {
keys.forEach((key) -> {
final MapCacheRecord record = cache.get(key);
if (record == null) {
results.put(key, null);
} else {
inverseCacheMap.remove(record);
record.hit();
inverseCacheMap.put(record, key);
results.put(key, record.getValue());
}
});
} finally {
readLock.unlock();
}
return results;
}
@Override
public ByteBuffer remove(ByteBuffer key) throws IOException {
writeLock.lock();

View File

@ -20,6 +20,8 @@ import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -89,6 +91,10 @@ public class TestSimpleMapCache {
assertNull(putResult.getEvicted());
assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision());
// Get multiple keys
Map<ByteBuffer, ByteBuffer> results = cache.subMap(Arrays.asList(key1, key2, key3));
assertEquals(3, results.size());
}
@Test