diff --git a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java index 34a086deb2..bbcfab70fd 100644 --- a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java +++ b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services-api/src/main/java/org/apache/nifi/hazelcast/services/cache/HazelcastCache.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.hazelcast.services.cache; +import java.util.Set; import java.util.function.Predicate; /** @@ -32,7 +33,14 @@ public interface HazelcastCache { String name(); /** - * Returns the value of the cache entry defined by the the key. + * Returns a set of all keys currently in the cache + * + * @return The Set of all keys currently in the cache + */ + Set keySet(); + + /** + * Returns the value of the cache entry defined by the key. * * @param key Key of the entry, must not be null. * diff --git a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java index 47d7151432..18d1631962 100644 --- a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java +++ b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCache.java @@ -51,6 +51,11 @@ public class IMapBasedHazelcastCache implements HazelcastCache { return storage.getName(); } + @Override + public Set keySet() { + return storage.keySet(); + } + @Override public byte[] get(final String key) { return storage.get(key); diff --git a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java index ebca07e2a7..9863acf5fd 100644 --- a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java +++ b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java @@ -34,23 +34,23 @@ import org.apache.nifi.processor.util.StandardValidators; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.Serializable; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; -import java.util.regex.Pattern; /** * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. * * Note: By design, the client should not directly depend on Hazelcast specific classes to allow easy version and implementation changes. */ -@Tags({ "hazelcast", "cache", "map"}) +@Tags({"hazelcast", "cache", "map"}) @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + "an other controller service, manages the actual Hazelcast calls, set in Hazelcast Cache Manager.") public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { @@ -116,9 +116,18 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement } } + @Override + public Set keySet(Deserializer keyDeserializer) throws IOException { + final HashSet keySet = new HashSet<>(); + for (String key : cache.keySet()) { + keySet.add(parseCacheEntryKey(key, keyDeserializer)); + } + return keySet; + } + @Override public AtomicCacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { - final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + final byte[] result = cache.get(serializeCacheEntryKey(key, keySerializer)); return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseRevision(result)); } @@ -128,18 +137,18 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement return false; } - final String key = getCacheEntryKey(entry.getKey(), keySerializer); + final String key = serializeCacheEntryKey(entry.getKey(), keySerializer); try (final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) { final byte[] oldValue = cache.get(key); - if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_REVISION)) { - cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_REVISION)); + if (oldValue == null && (entry.getRevision().isEmpty() || entry.getRevision().get() < STARTING_REVISION)) { + cache.put(key, serializeCacheEntryValue(entry.getValue(), valueSerializer, STARTING_REVISION)); getLogger().debug("Entry with key {} was added during replace", key); return true; } else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseRevision(oldValue))) { final long newRevision = entry.getRevision().get() + 1; - cache.put(key, serialize(entry.getValue(), valueSerializer, newRevision)); + cache.put(key, serializeCacheEntryValue(entry.getValue(), valueSerializer, newRevision)); getLogger().debug("Entry with key {} was updated during replace, with revision {}", key, newRevision); return true; } @@ -150,49 +159,36 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement @Override public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)) == null; + return cache.putIfAbsent(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION)) == null; } @Override public V getAndPutIfAbsent( final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer ) throws IOException { - final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)); + final byte[] result = cache.putIfAbsent(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION)); return (result == null) ? null : parsePayload(valueDeserializer, result); } @Override public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { - return cache.contains(getCacheEntryKey(key, keySerializer)); + return cache.contains(serializeCacheEntryKey(key, keySerializer)); } @Override public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - cache.put(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)); + cache.put(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION)); } @Override public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { - final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); + final byte[] result = cache.get(serializeCacheEntryKey(key, keySerializer)); return result == null ? null : parsePayload(valueDeserializer, result); } @Override public boolean remove(final K key, final Serializer keySerializer) throws IOException { - return cache.remove(getCacheEntryKey(key, keySerializer)); - } - - private static class RegexPredicate implements Predicate, Serializable { - private final Pattern pattern; - - private RegexPredicate(final String regex) { - this.pattern = Pattern.compile(regex); - } - - @Override - public boolean test(final String string) { - return pattern.matcher(string).matches(); - } + return cache.remove(serializeCacheEntryKey(key, keySerializer)); } @Override @@ -205,15 +201,7 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement return PROPERTY_DESCRIPTORS; } - private static long parseRevision(final byte[] value) { - return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong(); - } - - private static V parsePayload(final Deserializer deserializer, final byte[] value) throws IOException { - return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length)); - } - - private String getCacheEntryKey(final S key, final Serializer serializer) throws IOException { + private String serializeCacheEntryKey(final S key, final Serializer serializer) throws IOException { final String result; if (key instanceof String) { @@ -221,7 +209,7 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement } else { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); serializer.serialize(key, stream); - result = stream.toString("UTF-8"); + result = stream.toString(StandardCharsets.UTF_8); } if (result.isEmpty()) { @@ -231,20 +219,24 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement return result; } + private static K parseCacheEntryKey(final String key, final Deserializer keyDeserializer) throws IOException { + final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); + + return keyDeserializer.deserialize(keyBytes); + } + /** * Serializes a value using the given serializer. The first eight bytes of the array contains the revision. * The rest holds the actual serialized value. * - * @param value The value to serialize. + * @param value The value to serialize. * @param serializer The serializer to use in order to serialize the incoming value. - * @param version The version of the entry. - * @param The type of the value to be serialized. - * + * @param version The version of the entry. + * @param The type of the value to be serialized. * @return Byte array containing both version and value of the cache entry. - * * @throws IOException In case of any issue during working with intermediate byte stream. */ - private byte[] serialize(final S value, final Serializer serializer, final long version) throws IOException { + private byte[] serializeCacheEntryValue(final S value, final Serializer serializer, final long version) throws IOException { final ByteArrayOutputStream stream = new ByteArrayOutputStream(); stream.write(getVersionByteArray(version)); @@ -252,6 +244,14 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement return stream.toByteArray(); } + private static long parseRevision(final byte[] value) { + return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong(); + } + + private static V parsePayload(final Deserializer deserializer, final byte[] value) throws IOException { + return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length)); + } + private byte[] getVersionByteArray(final long version) { return ByteBuffer.allocate(Long.BYTES).putLong(version).array(); } diff --git a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java index 347eb931f1..f95d1b311a 100644 --- a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java +++ b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/HashMapHazelcastCache.java @@ -36,6 +36,11 @@ final public class HashMapHazelcastCache implements HazelcastCache { return name; } + @Override + public Set keySet() { + return values.keySet(); + } + @Override public byte[] get(final String key) { return values.get(key); diff --git a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java index d814dbf644..720b5f2003 100644 --- a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java +++ b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cache/IMapBasedHazelcastCacheTest.java @@ -26,6 +26,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import java.util.Arrays; import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -50,6 +51,20 @@ public class IMapBasedHazelcastCacheTest { testSubject = new IMapBasedHazelcastCache(storage, TTL); } + @Test + void testKeySet() { + // given + final Set keys = Set.of(KEY, KEY_2); + Mockito.when(storage.keySet()).thenReturn(keys); + + // when + final Set result = testSubject.keySet(); + + // then + Mockito.verify(storage).keySet(); + assertEquals(keys, result); + } + @Test public void testGet() { // given diff --git a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java index 9f47d10079..256efeecde 100644 --- a/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java +++ b/nifi-extension-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java @@ -35,7 +35,9 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.Set; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -291,6 +293,40 @@ public class HazelcastMapCacheClientTest { assertEquals(value, result); } + @Test + void testKeySetOnEmptyCache() throws IOException { + thenKeySetEquals(Set.of(), SERIALIZER); + } + + @Test + void testKeySetOnNonEmptyCache() throws IOException { + //when + whenPutEntry("key1", "1-value"); + whenPutEntry("key2", "2-value"); + whenPutEntry("key3", "3-value"); + whenPutEntry("key4", "4-value"); + + // then + thenKeySetEquals(Set.of("key1", "key2", "key3", "key4"), SERIALIZER); + } + + @Test + void testKeyWithNonStringKeys() throws IOException { + // given + final Serializer nonStringKeySerializer = + (value, output) -> output.write(ByteBuffer.allocate(4).putInt(value).array()); + final Deserializer nonStringKeyDeserializer = input -> ByteBuffer.wrap(input).getInt(); + + // when + testSubject.put(1, "1-value", nonStringKeySerializer, SERIALIZER); + testSubject.put(2, "2-value", nonStringKeySerializer, SERIALIZER); + testSubject.put(3, "3-value", nonStringKeySerializer, SERIALIZER); + testSubject.put(4, "4-value", nonStringKeySerializer, SERIALIZER); + + // then + thenKeySetEquals(Set.of(1, 2, 3, 4), nonStringKeyDeserializer); + } + private void whenRemoveEntryIsSuccessful() throws IOException { assertTrue(testSubject.remove(KEY, SERIALIZER)); } @@ -329,6 +365,10 @@ public class HazelcastMapCacheClientTest { assertFalse(testSubject.replace(cacheEntry, SERIALIZER, SERIALIZER)); } + private void thenKeySetEquals(final Set keys, final Deserializer keyDeserializer) throws IOException { + assertEquals(testSubject.keySet(keyDeserializer), keys); + } + private void thenEntryIsNotInCache(final String key) throws IOException { assertFalse(testSubject.containsKey(key, SERIALIZER)); }