NIFI-14043 Add support for keySet in HazelcastMapCacheClient (#9553)

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Lucas 2024-11-25 15:29:02 +01:00 committed by GitHub
parent cfa495b64d
commit 2199d64298
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 118 additions and 45 deletions

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.nifi.hazelcast.services.cache; package org.apache.nifi.hazelcast.services.cache;
import java.util.Set;
import java.util.function.Predicate; import java.util.function.Predicate;
/** /**
@ -32,7 +33,14 @@ public interface HazelcastCache {
String name(); String name();
/** /**
* Returns the value of the cache entry defined by the the key. * Returns a set of all keys currently in the cache
*
* @return The Set of all keys currently in the cache
*/
Set<String> keySet();
/**
* Returns the value of the cache entry defined by the key.
* *
* @param key Key of the entry, must not be null. * @param key Key of the entry, must not be null.
* *

View File

@ -51,6 +51,11 @@ public class IMapBasedHazelcastCache implements HazelcastCache {
return storage.getName(); return storage.getName();
} }
@Override
public Set<String> keySet() {
return storage.keySet();
}
@Override @Override
public byte[] get(final String key) { public byte[] get(final String key) {
return storage.get(key); return storage.get(key);

View File

@ -34,23 +34,23 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.regex.Pattern;
/** /**
* An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. * An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache.
* *
* Note: By design, the client should not directly depend on Hazelcast specific classes to allow easy version and implementation changes. * Note: By design, the client should not directly depend on Hazelcast specific classes to allow easy version and implementation changes.
*/ */
@Tags({ "hazelcast", "cache", "map"}) @Tags({"hazelcast", "cache", "map"})
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " + @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Hazelcast as the backing cache. This service relies on " +
"an other controller service, manages the actual Hazelcast calls, set in Hazelcast Cache Manager.") "an other controller service, manages the actual Hazelcast calls, set in Hazelcast Cache Manager.")
public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { public class HazelcastMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
@ -116,9 +116,18 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
} }
} }
@Override
public <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException {
final HashSet<K> keySet = new HashSet<>();
for (String key : cache.keySet()) {
keySet.add(parseCacheEntryKey(key, keyDeserializer));
}
return keySet;
}
@Override @Override
public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); final byte[] result = cache.get(serializeCacheEntryKey(key, keySerializer));
return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseRevision(result)); return (result == null) ? null : new AtomicCacheEntry<>(key, parsePayload(valueDeserializer, result), parseRevision(result));
} }
@ -128,18 +137,18 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
return false; return false;
} }
final String key = getCacheEntryKey(entry.getKey(), keySerializer); final String key = serializeCacheEntryKey(entry.getKey(), keySerializer);
try (final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) { try (final HazelcastCache.HazelcastCacheEntryLock lock = cache.acquireLock(key)) {
final byte[] oldValue = cache.get(key); final byte[] oldValue = cache.get(key);
if (oldValue == null && (!entry.getRevision().isPresent() || entry.getRevision().get() < STARTING_REVISION)) { if (oldValue == null && (entry.getRevision().isEmpty() || entry.getRevision().get() < STARTING_REVISION)) {
cache.put(key, serialize(entry.getValue(), valueSerializer, STARTING_REVISION)); cache.put(key, serializeCacheEntryValue(entry.getValue(), valueSerializer, STARTING_REVISION));
getLogger().debug("Entry with key {} was added during replace", key); getLogger().debug("Entry with key {} was added during replace", key);
return true; return true;
} else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseRevision(oldValue))) { } else if (oldValue != null && Objects.equals(entry.getRevision().get(), parseRevision(oldValue))) {
final long newRevision = entry.getRevision().get() + 1; final long newRevision = entry.getRevision().get() + 1;
cache.put(key, serialize(entry.getValue(), valueSerializer, newRevision)); cache.put(key, serializeCacheEntryValue(entry.getValue(), valueSerializer, newRevision));
getLogger().debug("Entry with key {} was updated during replace, with revision {}", key, newRevision); getLogger().debug("Entry with key {} was updated during replace, with revision {}", key, newRevision);
return true; return true;
} }
@ -150,49 +159,36 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
@Override @Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)) == null; return cache.putIfAbsent(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION)) == null;
} }
@Override @Override
public <K, V> V getAndPutIfAbsent( public <K, V> V getAndPutIfAbsent(
final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer
) throws IOException { ) throws IOException {
final byte[] result = cache.putIfAbsent(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)); final byte[] result = cache.putIfAbsent(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION));
return (result == null) ? null : parsePayload(valueDeserializer, result); return (result == null) ? null : parsePayload(valueDeserializer, result);
} }
@Override @Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return cache.contains(getCacheEntryKey(key, keySerializer)); return cache.contains(serializeCacheEntryKey(key, keySerializer));
} }
@Override @Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
cache.put(getCacheEntryKey(key, keySerializer), serialize(value, valueSerializer, STARTING_REVISION)); cache.put(serializeCacheEntryKey(key, keySerializer), serializeCacheEntryValue(value, valueSerializer, STARTING_REVISION));
} }
@Override @Override
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
final byte[] result = cache.get(getCacheEntryKey(key, keySerializer)); final byte[] result = cache.get(serializeCacheEntryKey(key, keySerializer));
return result == null ? null : parsePayload(valueDeserializer, result); return result == null ? null : parsePayload(valueDeserializer, result);
} }
@Override @Override
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException { public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
return cache.remove(getCacheEntryKey(key, keySerializer)); return cache.remove(serializeCacheEntryKey(key, keySerializer));
}
private static class RegexPredicate implements Predicate<String>, Serializable {
private final Pattern pattern;
private RegexPredicate(final String regex) {
this.pattern = Pattern.compile(regex);
}
@Override
public boolean test(final String string) {
return pattern.matcher(string).matches();
}
} }
@Override @Override
@ -205,15 +201,7 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
return PROPERTY_DESCRIPTORS; return PROPERTY_DESCRIPTORS;
} }
private static long parseRevision(final byte[] value) { private <S> String serializeCacheEntryKey(final S key, final Serializer<S> serializer) throws IOException {
return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong();
}
private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException {
return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length));
}
private <S> String getCacheEntryKey(final S key, final Serializer<S> serializer) throws IOException {
final String result; final String result;
if (key instanceof String) { if (key instanceof String) {
@ -221,7 +209,7 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
} else { } else {
final ByteArrayOutputStream stream = new ByteArrayOutputStream(); final ByteArrayOutputStream stream = new ByteArrayOutputStream();
serializer.serialize(key, stream); serializer.serialize(key, stream);
result = stream.toString("UTF-8"); result = stream.toString(StandardCharsets.UTF_8);
} }
if (result.isEmpty()) { if (result.isEmpty()) {
@ -231,20 +219,24 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
return result; return result;
} }
private static <K> K parseCacheEntryKey(final String key, final Deserializer<K> keyDeserializer) throws IOException {
final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8);
return keyDeserializer.deserialize(keyBytes);
}
/** /**
* Serializes a value using the given serializer. The first eight bytes of the array contains the revision. * Serializes a value using the given serializer. The first eight bytes of the array contains the revision.
* The rest holds the actual serialized value. * The rest holds the actual serialized value.
* *
* @param value The value to serialize. * @param value The value to serialize.
* @param serializer The serializer to use in order to serialize the incoming value. * @param serializer The serializer to use in order to serialize the incoming value.
* @param version The version of the entry. * @param version The version of the entry.
* @param <S> The type of the value to be serialized. * @param <S> The type of the value to be serialized.
*
* @return Byte array containing both version and value of the cache entry. * @return Byte array containing both version and value of the cache entry.
*
* @throws IOException In case of any issue during working with intermediate byte stream. * @throws IOException In case of any issue during working with intermediate byte stream.
*/ */
private <S> byte[] serialize(final S value, final Serializer<S> serializer, final long version) throws IOException { private <S> byte[] serializeCacheEntryValue(final S value, final Serializer<S> serializer, final long version) throws IOException {
final ByteArrayOutputStream stream = new ByteArrayOutputStream(); final ByteArrayOutputStream stream = new ByteArrayOutputStream();
stream.write(getVersionByteArray(version)); stream.write(getVersionByteArray(version));
@ -252,6 +244,14 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
return stream.toByteArray(); return stream.toByteArray();
} }
private static long parseRevision(final byte[] value) {
return ByteBuffer.wrap(Arrays.copyOfRange(value, 0, Long.BYTES)).getLong();
}
private static <V> V parsePayload(final Deserializer<V> deserializer, final byte[] value) throws IOException {
return deserializer.deserialize(Arrays.copyOfRange(value, Long.BYTES, value.length));
}
private byte[] getVersionByteArray(final long version) { private byte[] getVersionByteArray(final long version) {
return ByteBuffer.allocate(Long.BYTES).putLong(version).array(); return ByteBuffer.allocate(Long.BYTES).putLong(version).array();
} }

View File

@ -36,6 +36,11 @@ final public class HashMapHazelcastCache implements HazelcastCache {
return name; return name;
} }
@Override
public Set<String> keySet() {
return values.keySet();
}
@Override @Override
public byte[] get(final String key) { public byte[] get(final String key) {
return values.get(key); return values.get(key);

View File

@ -26,6 +26,7 @@ import org.mockito.junit.jupiter.MockitoExtension;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -50,6 +51,20 @@ public class IMapBasedHazelcastCacheTest {
testSubject = new IMapBasedHazelcastCache(storage, TTL); testSubject = new IMapBasedHazelcastCache(storage, TTL);
} }
@Test
void testKeySet() {
// given
final Set<String> keys = Set.of(KEY, KEY_2);
Mockito.when(storage.keySet()).thenReturn(keys);
// when
final Set<String> result = testSubject.keySet();
// then
Mockito.verify(storage).keySet();
assertEquals(keys, result);
}
@Test @Test
public void testGet() { public void testGet() {
// given // given

View File

@ -35,7 +35,9 @@ import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -291,6 +293,40 @@ public class HazelcastMapCacheClientTest {
assertEquals(value, result); assertEquals(value, result);
} }
@Test
void testKeySetOnEmptyCache() throws IOException {
thenKeySetEquals(Set.of(), SERIALIZER);
}
@Test
void testKeySetOnNonEmptyCache() throws IOException {
//when
whenPutEntry("key1", "1-value");
whenPutEntry("key2", "2-value");
whenPutEntry("key3", "3-value");
whenPutEntry("key4", "4-value");
// then
thenKeySetEquals(Set.of("key1", "key2", "key3", "key4"), SERIALIZER);
}
@Test
void testKeyWithNonStringKeys() throws IOException {
// given
final Serializer<Integer> nonStringKeySerializer =
(value, output) -> output.write(ByteBuffer.allocate(4).putInt(value).array());
final Deserializer<Integer> nonStringKeyDeserializer = input -> ByteBuffer.wrap(input).getInt();
// when
testSubject.put(1, "1-value", nonStringKeySerializer, SERIALIZER);
testSubject.put(2, "2-value", nonStringKeySerializer, SERIALIZER);
testSubject.put(3, "3-value", nonStringKeySerializer, SERIALIZER);
testSubject.put(4, "4-value", nonStringKeySerializer, SERIALIZER);
// then
thenKeySetEquals(Set.of(1, 2, 3, 4), nonStringKeyDeserializer);
}
private void whenRemoveEntryIsSuccessful() throws IOException { private void whenRemoveEntryIsSuccessful() throws IOException {
assertTrue(testSubject.remove(KEY, SERIALIZER)); assertTrue(testSubject.remove(KEY, SERIALIZER));
} }
@ -329,6 +365,10 @@ public class HazelcastMapCacheClientTest {
assertFalse(testSubject.replace(cacheEntry, SERIALIZER, SERIALIZER)); assertFalse(testSubject.replace(cacheEntry, SERIALIZER, SERIALIZER));
} }
private <K> void thenKeySetEquals(final Set<K> keys, final Deserializer<K> keyDeserializer) throws IOException {
assertEquals(testSubject.keySet(keyDeserializer), keys);
}
private void thenEntryIsNotInCache(final String key) throws IOException { private void thenEntryIsNotInCache(final String key) throws IOException {
assertFalse(testSubject.containsKey(key, SERIALIZER)); assertFalse(testSubject.containsKey(key, SERIALIZER));
} }