diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index ef0bd59e99..99c8ef511e 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -244,7 +244,12 @@ public class TestDetectDuplicate { @Override public long removeByPattern(String regex) throws IOException { - return exists ? 1L : 0L; + if (exists) { + exists = false; + return 1L; + } else { + return 0L; + } } @Override diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index d2e0085d7d..7fa6a6148b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -166,6 +166,23 @@ public interface DistributedMapCacheClient extends ControllerService { */ boolean remove(K key, Serializer serializer) throws IOException; + /** + * Removes the entry with the given key from the cache, if it is present, + * and returns the value that was removed from the map. + * + * @param type of key + * @param type of value + * @param key key + * @param keySerializer key serializer + * @param valueDeserializer value deserializer + * @return the value previously associated with the key, or null if there was no mapping + * null can also indicate that the map previously associated null with the key + * @throws IOException ex + */ + default V removeAndGet(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + throw new UnsupportedOperationException(); + } + /** * Removes entries whose keys match the specified pattern * @@ -174,4 +191,32 @@ public interface DistributedMapCacheClient extends ControllerService { * @throws IOException if any error occurred while removing an entry */ long removeByPattern(String regex) throws IOException; + + /** + * Removes entries whose keys match the specified pattern, and returns a map of entries that + * were removed. + * + * @param type of key + * @param type of value + * @param regex The regular expression / pattern on which to match the keys to be removed + * @param keyDeserializer key deserializer + * @param valueDeserializer value deserializer + * @return A map of key/value entries that were removed from the cache + * @throws IOException if any error occurred while removing an entry + */ + default Map removeByPatternAndGet(String regex, Deserializer keyDeserializer, Deserializer valueDeserializer) throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Returns a set of all keys currently in the cache + * + * @param type of key + * @param keyDeserializer key deserializer + * @return a Set of all keys currently in the cache + * @throws IOException ex + */ + default Set keySet(Deserializer keyDeserializer) throws IOException { + throw new UnsupportedOperationException(); + } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index c454063936..e655121672 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -22,6 +22,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -255,6 +256,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + @Override + public V removeAndGet(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + return withCommsSession(new CommsAction() { + @Override + public V execute(final CommsSession session) throws IOException { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("removeAndGet"); + + serialize(key, keySerializer, dos); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final byte[] responseBuffer = readLengthDelimitedResponse(dis); + return valueDeserializer.deserialize(responseBuffer); + } + }); + } + @Override public long removeByPattern(String regex) throws IOException { return withCommsSession(session -> { @@ -269,6 +291,34 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + @Override + public Map removeByPatternAndGet(String regex, Deserializer keyDeserializer, Deserializer valueDeserializer) throws IOException { + return withCommsSession(new CommsAction>() { + @Override + public Map execute(CommsSession session) throws IOException { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("removeByPatternAndGet"); + dos.writeUTF(regex); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final int mapSize = dis.readInt(); + HashMap resultMap = new HashMap<>(mapSize); + for (int i=0; i AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { @@ -321,6 +371,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService }); } + @Override + public Set keySet(Deserializer keyDeserializer) throws IOException { + return withCommsSession(session -> { + validateProtocolVersion(session, 3); + + final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); + dos.writeUTF("keySet"); + dos.flush(); + + // read response + final DataInputStream dis = new DataInputStream(session.getInputStream()); + final int setSize = dis.readInt(); + HashSet resultSet = new HashSet<>(setSize); + for (int i=0; i * *

DistributedMapCache version histories:

    + *
  • 3: Added subMap, keySet, removeAndGet, removeByPatternAndGet methods.
  • *
  • 2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.
  • *
  • 1: Initial version.
  • *

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java index 3dd224b3fb..3e6e7a9f03 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java @@ -17,6 +17,7 @@ package org.apache.nifi.distributed.cache.server; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -29,7 +30,6 @@ import org.apache.nifi.distributed.cache.server.set.PersistentSetCache; import org.apache.nifi.distributed.cache.server.set.SetCache; import org.apache.nifi.distributed.cache.server.set.SetCacheResult; import org.apache.nifi.distributed.cache.server.set.SimpleSetCache; -import org.apache.nifi.stream.io.DataOutputStream; public class SetCacheServer extends AbstractCacheServer { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java index bbffbf9ba2..e007ff0f32 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Set; public interface MapCache { @@ -41,5 +42,7 @@ public interface MapCache { MapPutResult replace(MapCacheRecord record) throws IOException; + Set keySet() throws IOException; + void shutdown() throws IOException; } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java index a0a01c1a9e..57af28ef2e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Set; import javax.net.ssl.SSLContext; @@ -144,12 +145,47 @@ public class MapCacheServer extends AbstractCacheServer { dos.writeBoolean(removed); break; } + case "removeAndGet": { + final byte[] key = readValue(dis); + final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key)); + if (removed == null) { + // there was no value removed + dos.writeInt(0); + } else { + // reply with the value that was removed + final byte[] byteArray = removed.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + break; + } case "removeByPattern": { final String pattern = dis.readUTF(); final Map removed = cache.removeByPattern(pattern); dos.writeLong(removed == null ? 0 : removed.size()); break; } + case "removeByPatternAndGet": { + final String pattern = dis.readUTF(); + final Map removed = cache.removeByPattern(pattern); + if (removed == null || removed.size() == 0) { + dos.writeLong(0); + } else { + // write the map size + dos.writeInt(removed.size()); + for (Map.Entry entry : removed.entrySet()) { + // write map entry key + final byte[] key = entry.getKey().array(); + dos.writeInt(key.length); + dos.write(key); + // write map entry value + final byte[] value = entry.getValue().array(); + dos.writeInt(value.length); + dos.write(value); + } + } + break; + } case "fetch": { final byte[] key = readValue(dis); final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key)); @@ -175,6 +211,18 @@ public class MapCacheServer extends AbstractCacheServer { dos.writeBoolean(result.isSuccessful()); break; } + case "keySet": { + final Set result = cache.keySet(); + // write the set size + dos.writeInt(result.size()); + // write each key in the set + for (ByteBuffer bb : result) { + final byte[] byteArray = bb.array(); + dos.writeInt(byteArray.length); + dos.write(byteArray); + } + break; + } default: { throw new IOException("Illegal Request"); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 6bf6e5a688..c1eebd61ee 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -170,6 +171,11 @@ public class PersistentMapCache implements MapCache { return removeResult; } + @Override + public Set keySet() throws IOException { + return wrapped.keySet(); + } + @Override public void shutdown() throws IOException { wali.shutdown(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index df7833262b..8571432549 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.Lock; @@ -273,6 +274,16 @@ public class SimpleMapCache implements MapCache { } } + @Override + public Set keySet() throws IOException { + readLock.lock(); + try { + return cache.keySet(); + } finally { + readLock.unlock(); + } + } + @Override public void shutdown() throws IOException { } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index 419a471f8d..e2a74d47b5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SystemUtils; @@ -274,6 +275,13 @@ public class TestServerAndClient { assertTrue(contains); assertTrue(contains2); + final Deserializer deserializer = new StringDeserializer(); + final Set keys = client.keySet(deserializer); + assertEquals(3, keys.size()); + assertTrue(keys.contains("test")); + assertTrue(keys.contains("test2")); + assertTrue(keys.contains("test3")); + final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer); assertFalse(addedAgain); @@ -307,6 +315,19 @@ public class TestServerAndClient { assertFalse(client.containsKey("test.2", serializer)); assertTrue(client.containsKey("test3", serializer)); + // test removeByPatternAndGet + client.put("test.1", "1", serializer, serializer); + client.put("test.2", "2", serializer, serializer); + Map removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer); + assertEquals(2, removed.size()); + assertTrue(removed.containsKey("test.1")); + assertTrue(removed.containsKey("test.2")); + assertFalse(client.containsKey("test.1", serializer)); + assertFalse(client.containsKey("test.2", serializer)); + assertTrue(client.containsKey("test3", serializer)); + removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer); + assertEquals(0, removed.size()); + newServer.shutdownServer(); client.close(); } @@ -437,6 +458,16 @@ public class TestServerAndClient { assertTrue(removed); LOGGER.debug("end remove"); + client.put("testKey", "testValue", keySerializer, valueSerializer); + assertTrue(client.containsKey("testKey", keySerializer)); + String removedValue = client.removeAndGet("testKey", keySerializer, deserializer); + assertEquals("testValue", removedValue); + removedValue = client.removeAndGet("testKey", keySerializer, deserializer); + assertNull(removedValue); + + final Set keys = client.keySet(deserializer); + assertEquals(0, keys.size()); + // Test removeByPattern, the first two should be removed and the last should remain client.put("test.1", "1", keySerializer, keySerializer); client.put("test.2", "2", keySerializer, keySerializer); @@ -687,6 +718,23 @@ public class TestServerAndClient { } catch (UnsupportedOperationException e) { } + try { + Set keys = client.keySet(stringDeserializer); + fail("Version 3 operations should NOT work."); + } catch (UnsupportedOperationException e) { + } + + try { + String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer); + fail("Version 3 operations should NOT work."); + } catch (UnsupportedOperationException e) { + } + + try { + Map removed = client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer); + fail("Version 3 operations should NOT work."); + } catch (UnsupportedOperationException e) { + } client.close(); server.shutdownServer(); }