diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java index 185ed72645..e50e843ce6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchDistributedMapCache.java @@ -26,6 +26,9 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; @@ -40,28 +43,35 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; @EventDriven @SupportsBatching @Tags({"map", "cache", "fetch", "distributed"}) @InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Computes a cache key from FlowFile attributes, for each incoming FlowFile, and fetches the value from the Distributed Map Cache associated " - + "with that key. The incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. If there is no value stored " - + "under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into " +@CapabilityDescription("Computes cache key(s) from FlowFile attributes, for each incoming FlowFile, and fetches the value(s) from the Distributed Map Cache associated " + + "with each key. If configured without a destination attribute, the incoming FlowFile's content is replaced with the binary data received by the Distributed Map Cache. " + + "If there is no value stored under that key then the flow file will be routed to 'not-found'. Note that the processor will always attempt to read the entire cached value into " + "memory before placing it in it's destination. This could be potentially problematic if the cached value is very large.") @WritesAttribute(attribute = "user-defined", description = "If the 'Put Cache Value In Attribute' property is set then whatever it is set to " - + "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache.") + + "will become the attribute key and the value would be whatever the response was from the Distributed Map Cache. If multiple cache entry identifiers are selected, " + + "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier. For example, if " + + "the Cache Entry Identifier property is set to 'id,name', and the user-defined property is named 'fetched', then two attributes will be written, " + + "fetched.id and fetched.name, containing their respective values.") @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService", "org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.processors.standard.PutDistributedMapCache"}) public class FetchDistributedMapCache extends AbstractProcessor { @@ -75,8 +85,10 @@ public class FetchDistributedMapCache extends AbstractProcessor { public static final PropertyDescriptor PROP_CACHE_ENTRY_IDENTIFIER = new PropertyDescriptor.Builder() .name("Cache Entry Identifier") - .description("A FlowFile attribute, or the results of an Attribute Expression Language statement, which will be evaluated " - + "against a FlowFile in order to determine the value used to identify duplicates; it is this value that is cached") + .description("A comma-delimited list of FlowFile attributes, or the results of Attribute Expression Language statements, which will be evaluated " + + "against a FlowFile in order to determine the value(s) used to identify duplicates; it is these values that are cached. NOTE: Only a single " + + "Cache Entry Identifier is allowed unless Put Cache Value In Attribute is specified. Multiple cache lookups are only supported when the destination " + + "is a set of attributes (see the documentation for 'Put Cache Value In Attribute' for more details including naming convention.") .required(true) .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true)) .defaultValue("${hash.value}") @@ -86,7 +98,8 @@ public class FetchDistributedMapCache extends AbstractProcessor { public static final PropertyDescriptor PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE = new PropertyDescriptor.Builder() .name("Put Cache Value In Attribute") .description("If set, the cache value received will be put into an attribute of the FlowFile instead of a the content of the" - + "FlowFile. The attribute key to put to is determined by evaluating value of this property.") + + "FlowFile. The attribute key to put to is determined by evaluating value of this property. If multiple Cache Entry Identifiers are selected, " + + "multiple attributes will be written, using the evaluated value of this property, appended by a period (.) and the name of the cache entry identifier.") .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING)) .expressionLanguageSupported(true) .build(); @@ -150,6 +163,35 @@ public class FetchDistributedMapCache extends AbstractProcessor { return relationships; } + @Override + protected Collection customValidate(ValidationContext validationContext) { + List results = new ArrayList<>(super.customValidate(validationContext)); + + PropertyValue cacheEntryIdentifier = validationContext.getProperty(PROP_CACHE_ENTRY_IDENTIFIER); + boolean elPresent = false; + try { + elPresent = cacheEntryIdentifier.isExpressionLanguagePresent(); + } catch (NullPointerException npe) { + // Unfortunate workaround to a mock framework bug (NIFI-4590) + } + + if (elPresent) { + // This doesn't do a full job of validating against the requirement that Put Cache Value In Attribute must be set if multiple + // Cache Entry Identifiers are supplied (if Expression Language is used). The user could conceivably have a comma-separated list of EL statements, + // or a single EL statement with commas inside it but that evaluates to a single item. + results.add(new ValidationResult.Builder().valid(true).explanation("Contains Expression Language").build()); + } else { + if (!validationContext.getProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet()) { + String identifierString = cacheEntryIdentifier.getValue(); + if (identifierString.contains(",")) { + results.add(new ValidationResult.Builder().valid(false) + .explanation("Multiple Cache Entry Identifiers specified without Put Cache Value In Attribute set").build()); + } + } + } + return results; + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -159,51 +201,80 @@ public class FetchDistributedMapCache extends AbstractProcessor { final ComponentLog logger = getLogger(); final String cacheKey = context.getProperty(PROP_CACHE_ENTRY_IDENTIFIER).evaluateAttributeExpressions(flowFile).getValue(); + // This block retains the previous behavior when only one Cache Entry Identifier was allowed, so as not to change the expected error message if (StringUtils.isBlank(cacheKey)) { logger.error("FlowFile {} has no attribute for given Cache Entry Identifier", new Object[]{flowFile}); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); return; } + List cacheKeys = Arrays.stream(cacheKey.split(",")).filter(path -> !StringUtils.isEmpty(path)).map(String::trim).collect(Collectors.toList()); + for (int i = 0; i < cacheKeys.size(); i++) { + if (StringUtils.isBlank(cacheKeys.get(i))) { + // Log first missing identifier, route to failure, and return + logger.error("FlowFile {} has no attribute for Cache Entry Identifier in position {}", new Object[]{flowFile, i}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + final DistributedMapCacheClient cache = context.getProperty(PROP_DISTRIBUTED_CACHE_SERVICE).asControllerService(DistributedMapCacheClient.class); try { - final byte[] cacheValue = cache.get(cacheKey, keySerializer, valueDeserializer); - - if(cacheValue==null){ - session.transfer(flowFile, REL_NOT_FOUND); - logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile}); - + final Map cacheValues; + final boolean singleKey = cacheKeys.size() == 1; + if (singleKey) { + cacheValues = new HashMap<>(1); + cacheValues.put(cacheKeys.get(0), cache.get(cacheKey, keySerializer, valueDeserializer)); } else { - boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet(); - if(putInAttribute){ - String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); - String attributeValue = new String(cacheValue,context.getProperty(PROP_CHARACTER_SET).getValue()); + cacheValues = cache.subMap(new HashSet<>(cacheKeys), keySerializer, valueDeserializer); + } + boolean notFound = false; + for(Map.Entry cacheValueEntry : cacheValues.entrySet()) { + final byte[] cacheValue = cacheValueEntry.getValue(); - int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); - if(maxLength < attributeValue.length()){ - attributeValue = attributeValue.substring(0,maxLength); + if (cacheValue == null) { + logger.info("Could not find an entry in cache for {}; routing to not-found", new Object[]{flowFile}); + notFound = true; + break; + } else { + boolean putInAttribute = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).isSet(); + if (putInAttribute) { + String attributeName = context.getProperty(PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE).evaluateAttributeExpressions(flowFile).getValue(); + if (!singleKey) { + // Append key to attribute name if multiple keys + attributeName += "." + cacheValueEntry.getKey(); + } + String attributeValue = new String(cacheValue, context.getProperty(PROP_CHARACTER_SET).getValue()); + + int maxLength = context.getProperty(PROP_PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); + if (maxLength < attributeValue.length()) { + attributeValue = attributeValue.substring(0, maxLength); + } + + flowFile = session.putAttribute(flowFile, attributeName, attributeValue); + + } else if (cacheKeys.size() > 1) { + throw new IOException("Multiple Cache Value Identifiers specified without Put Cache Value In Attribute set"); + } else { + // Write single value to content + flowFile = session.write(flowFile, out -> out.write(cacheValue)); } - flowFile = session.putAttribute(flowFile, attributeName, attributeValue); - - } else { - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write(cacheValue); - } - }); - } - - session.transfer(flowFile, REL_SUCCESS); - if(putInAttribute){ - logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile}); - }else { - logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile}); + if (putInAttribute) { + logger.info("Found a cache key of {} and added an attribute to {} with it's value.", new Object[]{cacheKey, flowFile}); + } else { + logger.info("Found a cache key of {} and replaced the contents of {} with it's value.", new Object[]{cacheKey, flowFile}); + } } } - + // If the loop was exited because a cache entry was not found, route to REL_NOT_FOUND; otherwise route to REL_SUCCESS + if (notFound) { + session.transfer(flowFile, REL_NOT_FOUND); + } else { + session.transfer(flowFile, REL_SUCCESS); + } } catch (final IOException e) { flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java index 549ad13322..4c2d9917b6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java @@ -69,6 +69,19 @@ public class TestFetchDistributedMapCache { runner.clearTransferState(); } + @Test + public void testNoCacheKeyValue() throws InitializationException { + + runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); + runner.enqueue(new byte[] {}); + runner.run(); + + // Cache key attribute evaluated to empty + runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_FAILURE, 1); + runner.assertTransferCount(FetchDistributedMapCache.REL_FAILURE, 1); + runner.clearTransferState(); + } + @Test public void testFailingCacheService() throws InitializationException, IOException { service.setFailOnCalls(true); @@ -151,6 +164,51 @@ public class TestFetchDistributedMapCache { runner.clearTransferState(); } + @Test + public void testMultipleKeysToAttributes() throws InitializationException, IOException { + service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer()); + service.put("key2","value2", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer()); + runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2"); + // Not valid to set multiple keys without Put Cache Value In Attribute set + runner.assertNotValid(); + runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test"); + runner.assertValid(); + + final Map props = new HashMap<>(); + runner.enqueue(new byte[]{}, props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_SUCCESS, 1); + runner.assertTransferCount(FetchDistributedMapCache.REL_SUCCESS, 1); + + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_SUCCESS).get(0); + outputFlowFile.assertAttributeEquals("test.key1","value1"); + outputFlowFile.assertAttributeEquals("test.key2","value2"); + } + + @Test + public void testMultipleKeysOneNotFound() throws InitializationException, IOException { + service.put("key1","value1", new FetchDistributedMapCache.StringSerializer(), new FetchDistributedMapCache.StringSerializer()); + runner.setProperty(FetchDistributedMapCache.PROP_CACHE_ENTRY_IDENTIFIER, "key1, key2"); + // Not valid to set multiple keys without Put Cache Value In Attribute set + runner.assertNotValid(); + runner.setProperty(FetchDistributedMapCache.PROP_PUT_CACHE_VALUE_IN_ATTRIBUTE, "test"); + runner.assertValid(); + + final Map props = new HashMap<>(); + runner.enqueue(new byte[]{}, props); + + runner.run(); + + runner.assertAllFlowFilesTransferred(FetchDistributedMapCache.REL_NOT_FOUND, 1); + runner.assertTransferCount(FetchDistributedMapCache.REL_NOT_FOUND, 1); + + final MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(FetchDistributedMapCache.REL_NOT_FOUND).get(0); + outputFlowFile.assertAttributeEquals("test.key1","value1"); + } + + private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private final ConcurrentMap values = new ConcurrentHashMap<>(); private boolean failOnCalls = false; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index e593f9d1bd..d2e0085d7d 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -17,6 +17,9 @@ package org.apache.nifi.distributed.cache.client; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -117,6 +120,32 @@ public interface DistributedMapCacheClient extends ControllerService { */ V get(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException; + /** + * Returns the values in the cache for the given keys, if they exist; + * otherwise returns null + * + * @param the key type + * @param the value type + * @param keys a set of keys whose values to lookup in the map + * @param keySerializer key serializer + * @param valueDeserializer value serializer + * + * @return the value in the cache for the given key, if one exists; + * otherwise returns null + * @throws IOException ex + */ + default Map subMap(Set keys, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + // Default behavior is to iterate over the keys, calling get(key) and putting it into the results map + if (keys == null) { + return null; + } + Map results = new HashMap<>(keys.size()); + for (K key : keys) { + results.put(key, get(key, keySerializer, valueDeserializer)); + } + return results; + } + /** * Attempts to notify the server that we are finished communicating with it * and cleans up resources diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 9651c26cf4..c454063936 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -21,7 +21,10 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -204,6 +207,36 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + @Override + public Map subMap(Set keys, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + return withCommsSession(session -> { + Map response = new HashMap<>(keys.size()); + try { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("subMap"); + serialize(keys, keySerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + + for (K key : keys) { + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + response.put(key, valueDeserializer.deserialize(responseBuffer)); + } + } catch (UnsupportedOperationException uoe) { + // If the server doesn't support subMap, just emulate it with multiple calls to get() + for (K key : keys) { + response.put(key, get(key, keySerializer, valueDeserializer)); + } + } + + return response; + }); + } + @Override public boolean remove(final K key, final Serializer serializer) throws IOException { return withCommsSession(new CommsAction() { @@ -319,7 +352,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService } session = createCommsSession(configContext); - final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1); + final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(3, 2, 1); try { ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); session.setProtocolVersion(versionNegotiator.getVersion()); @@ -368,6 +401,17 @@ public class DistributedMapCacheClientService extends AbstractControllerService baos.writeTo(dos); } + private void serialize(final Set values, final Serializer serializer, final DataOutputStream dos) throws IOException { + // Write the number of elements to follow, then each element and its size + dos.writeInt(values.size()); + for(T value : values) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + serializer.serialize(value, baos); + dos.writeInt(baos.size()); + baos.writeTo(dos); + } + } + private T withCommsSession(final CommsAction action) throws IOException { if (closed) { throw new IllegalStateException("Client is closed"); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index 8bd9bdcdcf..bbffbf9ba2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; public interface MapCache { @@ -30,6 +31,8 @@ public interface MapCache { ByteBuffer get(ByteBuffer key) throws IOException; + Map subMap(List keys) throws IOException; + ByteBuffer remove(ByteBuffer key) throws IOException; Map removeByPattern(String regex) throws IOException; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index 21090bcc48..a0a01c1a9e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -56,7 +56,7 @@ public class MapCacheServer extends AbstractCacheServer { * for details of each version enhancements. */ protected StandardVersionNegotiator getVersionNegotiator() { - return new StandardVersionNegotiator(2, 1); + return new StandardVersionNegotiator(3, 2, 1); } @Override @@ -121,6 +121,23 @@ public class MapCacheServer extends AbstractCacheServer { break; } + case "subMap": { + final int numKeys = dis.readInt(); + for(int i=0;i subMap(List keys) throws IOException { + if (keys == null) { + return null; + } + Map results = new HashMap<>(keys.size()); + for (ByteBuffer key : keys) { + results.put(key, wrapped.get(key)); + } + return results; + } + @Override public MapCacheRecord fetch(ByteBuffer key) throws IOException { return wrapped.fetch(key); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index baa2d0f3b7..df7833262b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -60,7 +60,7 @@ public class SimpleMapCache implements MapCache { @Override public String toString() { - return "SimpleSetCache[service id=" + serviceIdentifier + "]"; + return "SimpleMapCache[service id=" + serviceIdentifier + "]"; } // don't need synchronized because this method is only called when the writeLock is held, and all @@ -170,6 +170,31 @@ public class SimpleMapCache implements MapCache { } } + @Override + public Map subMap(List keys) throws IOException { + if (keys == null) { + return null; + } + Map results = new HashMap<>(keys.size()); + readLock.lock(); + try { + keys.forEach((key) -> { + final MapCacheRecord record = cache.get(key); + if (record == null) { + results.put(key, null); + } else { + inverseCacheMap.remove(record); + record.hit(); + inverseCacheMap.put(record, key); + results.put(key, record.getValue()); + } + }); + } finally { + readLock.unlock(); + } + return results; + } + @Override public ByteBuffer remove(ByteBuffer key) throws IOException { writeLock.lock(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java index 2e19714c10..0cd2813174 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java @@ -20,6 +20,8 @@ import org.apache.nifi.distributed.cache.server.EvictionPolicy; import org.junit.Test; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -89,6 +91,10 @@ public class TestSimpleMapCache { assertNull(putResult.getEvicted()); assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision()); + // Get multiple keys + Map results = cache.subMap(Arrays.asList(key1, key2, key3)); + assertEquals(3, results.size()); + } @Test