NIFI-4049: Refactor AtomicDistributedMapCacheClient

To be used with cache engines that does not have revision number.

NIFI-4049: Refactor AtomicDistributedMapCacheClient

Removed old methods completely.

This closes #1904.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2017-06-09 16:28:45 +09:00 committed by Bryan Bende
parent 8ef4fddddd
commit de1b84e2aa
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
7 changed files with 99 additions and 90 deletions

View File

@ -18,8 +18,8 @@ package org.apache.nifi.processors.standard;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
@ -59,7 +59,7 @@ public class WaitNotifyProtocol {
*/ */
transient private String identifier; transient private String identifier;
transient private long revision = -1; transient private AtomicCacheEntry<String, String, Object> cachedEntry;
private Map<String, Long> counts = new HashMap<>(); private Map<String, Long> counts = new HashMap<>();
private Map<String, String> attributes = new HashMap<>(); private Map<String, String> attributes = new HashMap<>();
private int releasableCount = 0; private int releasableCount = 0;
@ -225,9 +225,10 @@ public class WaitNotifyProtocol {
* @throws IOException thrown when it failed interacting with the cache engine * @throws IOException thrown when it failed interacting with the cache engine
* @throws DeserializationException thrown if the cache found is not in expected serialized format * @throws DeserializationException thrown if the cache found is not in expected serialized format
*/ */
@SuppressWarnings("unchecked")
public Signal getSignal(final String signalId) throws IOException, DeserializationException { public Signal getSignal(final String signalId) throws IOException, DeserializationException {
final CacheEntry<String, String> entry = cache.fetch(signalId, stringSerializer, stringDeserializer); final AtomicCacheEntry<String, String, Object> entry = (AtomicCacheEntry<String, String, Object>) cache.fetch(signalId, stringSerializer, stringDeserializer);
if (entry == null) { if (entry == null) {
// No signal found. // No signal found.
@ -239,7 +240,7 @@ public class WaitNotifyProtocol {
try { try {
final Signal signal = objectMapper.readValue(value, Signal.class); final Signal signal = objectMapper.readValue(value, Signal.class);
signal.identifier = signalId; signal.identifier = signalId;
signal.revision = entry.getRevision(); signal.cachedEntry = entry;
return signal; return signal;
} catch (final JsonParseException jsonE) { } catch (final JsonParseException jsonE) {
// Try to read it as FlowFileAttributes for backward compatibility. // Try to read it as FlowFileAttributes for backward compatibility.
@ -270,7 +271,12 @@ public class WaitNotifyProtocol {
public boolean replace(final Signal signal) throws IOException { public boolean replace(final Signal signal) throws IOException {
final String signalJson = objectMapper.writeValueAsString(signal); final String signalJson = objectMapper.writeValueAsString(signal);
return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision); if (signal.cachedEntry == null) {
signal.cachedEntry = new AtomicCacheEntry<>(signal.identifier, signalJson, null);
} else {
signal.cachedEntry.setValue(signalJson);
}
return cache.replace(signal.cachedEntry, stringSerializer, stringSerializer);
} }
} }

View File

@ -17,10 +17,10 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.StandardCacheEntry;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
@ -310,8 +310,8 @@ public class TestNotify {
} }
static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
private final ConcurrentMap<Object, CacheEntry> values = new ConcurrentHashMap<>(); private final ConcurrentMap<Object, AtomicCacheEntry<Object, Object, Long>> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false; private boolean failOnCalls = false;
void setFailOnCalls(boolean failOnCalls){ void setFailOnCalls(boolean failOnCalls){
@ -359,7 +359,7 @@ public class TestNotify {
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail(); verifyNotFail();
final CacheEntry entry = values.get(key); final AtomicCacheEntry entry = values.get(key);
if (entry == null) { if (entry == null) {
return null; return null;
} }
@ -397,22 +397,23 @@ public class TestNotify {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail(); verifyNotFail();
return values.get(key); return (AtomicCacheEntry<K, V, Long>) values.get(key);
} }
@Override @Override
public <K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException { public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
verifyNotFail(); verifyNotFail();
final CacheEntry existing = values.get(key); final K key = entry.getKey();
if (existing != null && existing.getRevision() != revision) { final AtomicCacheEntry<Object, Object, Long> existing = values.get(key);
if (existing != null && !existing.getRevision().equals(entry.getRevision())) {
return false; return false;
} }
values.put(key, new StandardCacheEntry<>(key, value, revision + 1)); values.put(key, new AtomicCacheEntry<>(key, entry.getValue(), entry.getRevision().orElse(0L) + 1));
return true; return true;
} }

View File

@ -17,9 +17,8 @@
package org.apache.nifi.processors.standard; package org.apache.nifi.processors.standard;
import org.apache.activemq.util.ByteArrayOutputStream; import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry;
import org.apache.nifi.distributed.cache.client.StandardCacheEntry;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
@ -46,33 +45,29 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class TestWaitNotifyProtocol { public class TestWaitNotifyProtocol {
private final Map<String, CacheEntry<String, String>> cacheEntries = new HashMap<>(); private final Map<String, AtomicCacheEntry<String, String, Long>> cacheEntries = new HashMap<>();
private AtomicDistributedMapCacheClient cache; private AtomicDistributedMapCacheClient<Long> cache;
@SuppressWarnings("unchecked")
private final Answer successfulReplace = invocation -> { private final Answer successfulReplace = invocation -> {
final String key = invocation.getArgumentAt(0, String.class); final AtomicCacheEntry<String, String, Long> entry = invocation.getArgumentAt(0, AtomicCacheEntry.class);
final String value = invocation.getArgumentAt(1, String.class); cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getRevision().orElse(0L) + 1));
final Long revision = invocation.getArgumentAt(4, Long.class);
cacheEntries.put(key, new StandardCacheEntry<>(key, value, revision + 1));
return true; return true;
}; };
@Before @Before
@SuppressWarnings("unchecked")
public void before() throws Exception { public void before() throws Exception {
cacheEntries.clear(); cacheEntries.clear();
// Default mock implementations. // Default mock implementations.
cache = mock(AtomicDistributedMapCacheClient.class); cache = mock(AtomicDistributedMapCacheClient.class);
doAnswer(invocation -> { doAnswer(invocation -> cacheEntries.get(invocation.getArguments()[0])).when(cache).fetch(any(), any(), any());
final CacheEntry<String, String> entry = cacheEntries.get(invocation.getArguments()[0]);
return entry;
}).when(cache).fetch(any(), any(), any());
} }
@Test @Test
@ -80,7 +75,7 @@ public class TestWaitNotifyProtocol {
// replace always return false. // replace always return false.
doAnswer(invocation -> false) doAnswer(invocation -> false)
.when(cache).replace(any(), any(), any(), any(), anyLong()); .when(cache).replace(any(), any(), any());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@ -95,7 +90,7 @@ public class TestWaitNotifyProtocol {
@Test @Test
public void testNotifyFirst() throws Exception { public void testNotifyFirst() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); doAnswer(successfulReplace).when(cache).replace(any(), any(), any());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@ -106,16 +101,16 @@ public class TestWaitNotifyProtocol {
assertEquals(Long.valueOf(1), signal.getCounts().get("a")); assertEquals(Long.valueOf(1), signal.getCounts().get("a"));
assertTrue(cacheEntries.containsKey("signal-id")); assertTrue(cacheEntries.containsKey("signal-id"));
final CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id"); final AtomicCacheEntry<String, String, Long> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision()); assertEquals(1, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
} }
@Test @Test
public void testNotifyCounters() throws Exception { public void testNotifyCounters() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); doAnswer(successfulReplace).when(cache).replace(any(), any(), any());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@ -124,21 +119,21 @@ public class TestWaitNotifyProtocol {
protocol.notify(signalId, "a", 1, null); protocol.notify(signalId, "a", 1, null);
protocol.notify(signalId, "a", 1, null); protocol.notify(signalId, "a", 1, null);
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id"); AtomicCacheEntry<String, String, Long> cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision()); assertEquals(2, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "a", 10, null); protocol.notify(signalId, "a", 10, null);
cacheEntry = cacheEntries.get("signal-id"); cacheEntry = cacheEntries.get("signal-id");
assertEquals(2, cacheEntry.getRevision()); assertEquals(3, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
protocol.notify(signalId, "b", 2, null); protocol.notify(signalId, "b", 2, null);
protocol.notify(signalId, "c", 3, null); protocol.notify(signalId, "c", 3, null);
cacheEntry = cacheEntries.get("signal-id"); cacheEntry = cacheEntries.get("signal-id");
assertEquals(4, cacheEntry.getRevision()); assertEquals(5, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, Integer> deltas = new HashMap<>(); final Map<String, Integer> deltas = new HashMap<>();
@ -147,20 +142,20 @@ public class TestWaitNotifyProtocol {
protocol.notify("signal-id", deltas, null); protocol.notify("signal-id", deltas, null);
cacheEntry = cacheEntries.get("signal-id"); cacheEntry = cacheEntries.get("signal-id");
assertEquals(5, cacheEntry.getRevision()); assertEquals(6, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
// Zero clear 'b'. // Zero clear 'b'.
protocol.notify("signal-id", "b", 0, null); protocol.notify("signal-id", "b", 0, null);
cacheEntry = cacheEntries.get("signal-id"); cacheEntry = cacheEntries.get("signal-id");
assertEquals(6, cacheEntry.getRevision()); assertEquals(7, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue());
} }
@Test @Test
public void testNotifyAttributes() throws Exception { public void testNotifyAttributes() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); doAnswer(successfulReplace).when(cache).replace(any(), any(), any());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@ -172,8 +167,8 @@ public class TestWaitNotifyProtocol {
protocol.notify(signalId, "a", 1, attributeA1); protocol.notify(signalId, "a", 1, attributeA1);
CacheEntry<String, String> cacheEntry = cacheEntries.get("signal-id"); AtomicCacheEntry<String, String, Long> cacheEntry = cacheEntries.get("signal-id");
assertEquals(0, cacheEntry.getRevision()); assertEquals(1L, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue());
final Map<String, String> attributeA2 = new HashMap<>(); final Map<String, String> attributeA2 = new HashMap<>();
@ -184,7 +179,7 @@ public class TestWaitNotifyProtocol {
protocol.notify(signalId, "a", 1, attributeA2); protocol.notify(signalId, "a", 1, attributeA2);
cacheEntry = cacheEntries.get("signal-id"); cacheEntry = cacheEntries.get("signal-id");
assertEquals(1, cacheEntry.getRevision()); assertEquals(2L, cacheEntry.getRevision().orElse(-1L).longValue());
assertEquals("Updated attributes should be merged correctly", assertEquals("Updated attributes should be merged correctly",
"{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue()); "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue());
@ -192,7 +187,7 @@ public class TestWaitNotifyProtocol {
@Test @Test
public void testSignalCount() throws Exception { public void testSignalCount() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); doAnswer(successfulReplace).when(cache).replace(any(), any(), any());
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
@ -233,7 +228,7 @@ public class TestWaitNotifyProtocol {
*/ */
@Test @Test
public void testNiFiVersionUpgrade() throws Exception { public void testNiFiVersionUpgrade() throws Exception {
doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); doAnswer(successfulReplace).when(cache).replace(any(), any(), any());
// Simulate old cache entry. // Simulate old cache entry.
final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer(); final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer();
@ -245,7 +240,7 @@ public class TestWaitNotifyProtocol {
attributesSerializer.serialize(cachedAttributes, bos); attributesSerializer.serialize(cachedAttributes, bos);
final String signalId = "old-entry"; final String signalId = "old-entry";
cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0)); cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0L));
final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache);
final Signal signal = protocol.getSignal(signalId); final Signal signal = protocol.getSignal(signalId);
@ -255,7 +250,7 @@ public class TestWaitNotifyProtocol {
assertEquals("value2", signal.getAttributes().get("key2")); assertEquals("value2", signal.getAttributes().get("key2"));
assertEquals("value3", signal.getAttributes().get("key3")); assertEquals("value3", signal.getAttributes().get("key3"));
cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0)); cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0L));
try { try {
protocol.getSignal(signalId); protocol.getSignal(signalId);
fail("Should fail since cached value was not in expected format."); fail("Should fail since cached value was not in expected format.");

View File

@ -16,31 +16,43 @@
*/ */
package org.apache.nifi.distributed.cache.client; package org.apache.nifi.distributed.cache.client;
public class StandardCacheEntry<K,V> implements AtomicDistributedMapCacheClient.CacheEntry<K,V> { import java.util.Optional;
public class AtomicCacheEntry<K, V, R> {
private final K key; private final K key;
private final V value; private V value;
private final long revision; private final R revision;
/**
public StandardCacheEntry(final K key, final V value, final long revision) { * Create new cache entry.
* @param key cache key
* @param value cache value
* @param revision cache revision, can be null with a brand new entry
*/
public AtomicCacheEntry(final K key, final V value, final R revision) {
this.key = key; this.key = key;
this.value = value; this.value = value;
this.revision = revision; this.revision = revision;
} }
@Override /**
public long getRevision() { * @return the latest revision stored in a cache server
return revision; */
public Optional<R> getRevision() {
return Optional.ofNullable(revision);
} }
@Override
public K getKey() { public K getKey() {
return key; return key;
} }
@Override
public V getValue() { public V getValue() {
return value; return value;
} }
public void setValue(V value) {
this.value = value;
}
} }

View File

@ -29,21 +29,14 @@ import java.io.IOException;
* this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2. * this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2.
* *
* <p>If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException. * <p>If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException.
* @param <R> The revision type.
* If the underlying cache storage supports the concept of revision to implement optimistic locking, then a client implementation should use that.
* Otherwise set the cached value and check if the key is not updated at {@link #replace(AtomicCacheEntry, Serializer, Serializer)}
*/ */
@Tags({"distributed", "client", "cluster", "map", "cache"}) @Tags({"distributed", "client", "cluster", "map", "cache"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows " @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows "
+ "multiple nodes to coordinate state with a single remote entity.") + "multiple nodes to coordinate state with a single remote entity.")
public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient { public interface AtomicDistributedMapCacheClient<R> extends DistributedMapCacheClient {
interface CacheEntry<K, V> {
long getRevision();
K getKey();
V getValue();
}
/** /**
* Fetch a CacheEntry with a key. * Fetch a CacheEntry with a key.
@ -55,22 +48,20 @@ public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClie
* @return A CacheEntry instance if one exists, otherwise <cod>null</cod>. * @return A CacheEntry instance if one exists, otherwise <cod>null</cod>.
* @throws IOException if unable to communicate with the remote instance * @throws IOException if unable to communicate with the remote instance
*/ */
<K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException; <K, V> AtomicCacheEntry<K, V, R> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
/** /**
* Replace an existing key with new value. * Replace an existing key with new value.
* @param <K> the key type * @param <K> the key type
* @param <V> the value type * @param <V> the value type
* @param key the key to replace * @param entry should provide the new value for {@link AtomicCacheEntry#getValue()},
* @param value the new value for the key * and the same revision in the cache storage for {@link AtomicCacheEntry#getRevision()},
* if the revision does not match with the one in the cache storage, value will not be replaced.
* @param keySerializer key serializer * @param keySerializer key serializer
* @param valueSerializer value serializer * @param valueSerializer value serializer
* @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client,
* this doesn't match with the one on server, therefore the replace operation will not be performed.
* If there's no existing entry for the key, any revision can replace the key.
* @return true only if the key is replaced. * @return true only if the key is replaced.
* @throws IOException if unable to communicate with the remote instance * @throws IOException if unable to communicate with the remote instance
*/ */
<K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException; <K, V> boolean replace(AtomicCacheEntry<K, V, R> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException;
} }

View File

@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory;
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
+ "between nodes in a NiFi cluster") + "between nodes in a NiFi cluster")
public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
@ -237,7 +237,8 @@ public class DistributedMapCacheClientService extends AbstractControllerService
} }
@Override @Override
public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { @SuppressWarnings("unchecked")
public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(session -> { return withCommsSession(session -> {
validateProtocolVersion(session, 2); validateProtocolVersion(session, 2);
@ -257,8 +258,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService
return null; return null;
} }
final StandardCacheEntry<K, V> standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision); return new AtomicCacheEntry(key, valueDeserializer.deserialize(responseBuffer), revision);
return standardCacheEntry;
}); });
} }
@ -269,16 +269,16 @@ public class DistributedMapCacheClientService extends AbstractControllerService
} }
@Override @Override
public <K, V> boolean replace(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final long revision) throws IOException { public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> entry, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
return withCommsSession(session -> { return withCommsSession(session -> {
validateProtocolVersion(session, 2); validateProtocolVersion(session, 2);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("replace"); dos.writeUTF("replace");
serialize(key, keySerializer, dos); serialize(entry.getKey(), keySerializer, dos);
dos.writeLong(revision); dos.writeLong(entry.getRevision().orElse(0L));
serialize(value, valueSerializer, dos); serialize(entry.getValue(), valueSerializer, dos);
dos.flush(); dos.flush();

View File

@ -34,7 +34,7 @@ import java.util.Map;
import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
@ -587,18 +587,20 @@ public class TestServerAndClient {
client1.put(key, "valueC1-0", stringSerializer, stringSerializer); client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
// Client 1 and 2 fetch the key // Client 1 and 2 fetch the key
AtomicDistributedMapCacheClient.CacheEntry<String, String> c1 = client1.fetch(key, stringSerializer, stringDeserializer); AtomicCacheEntry<String, String, Long> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
AtomicDistributedMapCacheClient.CacheEntry<String, String> c2 = client2.fetch(key, stringSerializer, stringDeserializer); AtomicCacheEntry<String, String, Long> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
assertEquals(0, c1.getRevision()); assertEquals(0, c1.getRevision());
assertEquals("valueC1-0", c1.getValue()); assertEquals("valueC1-0", c1.getValue());
assertEquals(0, c2.getRevision()); assertEquals(0, c2.getRevision());
assertEquals("valueC1-0", c2.getValue()); assertEquals("valueC1-0", c2.getValue());
// Client 1 replace // Client 1 replace
boolean c1Result = client1.replace(key, "valueC1-1", stringSerializer, stringSerializer, c1.getRevision()); c1.setValue("valueC1-1");
boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer);
assertTrue("C1 should be able to replace the key", c1Result); assertTrue("C1 should be able to replace the key", c1Result);
// Client 2 replace with the old revision // Client 2 replace with the old revision
boolean c2Result = client2.replace(key, "valueC2-1", stringSerializer, stringSerializer, c2.getRevision()); c2.setValue("valueC2-1");
boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer);
assertFalse("C2 shouldn't be able to replace the key", c2Result); assertFalse("C2 shouldn't be able to replace the key", c2Result);
// Client 2 fetch the key again // Client 2 fetch the key again
@ -607,7 +609,8 @@ public class TestServerAndClient {
assertEquals(1, c2.getRevision()); assertEquals(1, c2.getRevision());
// Now, Client 2 knows the correct revision so it can replace the key // Now, Client 2 knows the correct revision so it can replace the key
c2Result = client2.replace(key, "valueC2-2", stringSerializer, stringSerializer, c2.getRevision()); c2.setValue("valueC2-2");
c2Result = client2.replace(c2, stringSerializer, stringSerializer);
assertTrue("C2 should be able to replace the key", c2Result); assertTrue("C2 should be able to replace the key", c2Result);
// Assert the cache // Assert the cache
@ -678,7 +681,8 @@ public class TestServerAndClient {
} }
try { try {
client.replace(key, "value2", stringSerializer, stringSerializer, 0L); AtomicCacheEntry<String,String,Long> entry = new AtomicCacheEntry<>(key, "value2", 0L);
client.replace(entry, stringSerializer, stringSerializer);
fail("Version 2 operations should NOT work."); fail("Version 2 operations should NOT work.");
} catch (UnsupportedOperationException e) { } catch (UnsupportedOperationException e) {
} }