diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java index 1c89108960..cd55d27a61 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java @@ -18,8 +18,8 @@ package org.apache.nifi.processors.standard; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; @@ -59,7 +59,7 @@ public class WaitNotifyProtocol { */ transient private String identifier; - transient private long revision = -1; + transient private AtomicCacheEntry cachedEntry; private Map counts = new HashMap<>(); private Map attributes = new HashMap<>(); private int releasableCount = 0; @@ -225,9 +225,10 @@ public class WaitNotifyProtocol { * @throws IOException thrown when it failed interacting with the cache engine * @throws DeserializationException thrown if the cache found is not in expected serialized format */ + @SuppressWarnings("unchecked") public Signal getSignal(final String signalId) throws IOException, DeserializationException { - final CacheEntry entry = cache.fetch(signalId, stringSerializer, stringDeserializer); + final AtomicCacheEntry entry = (AtomicCacheEntry) cache.fetch(signalId, stringSerializer, stringDeserializer); if (entry == null) { // No signal found. @@ -239,7 +240,7 @@ public class WaitNotifyProtocol { try { final Signal signal = objectMapper.readValue(value, Signal.class); signal.identifier = signalId; - signal.revision = entry.getRevision(); + signal.cachedEntry = entry; return signal; } catch (final JsonParseException jsonE) { // Try to read it as FlowFileAttributes for backward compatibility. @@ -270,7 +271,12 @@ public class WaitNotifyProtocol { public boolean replace(final Signal signal) throws IOException { final String signalJson = objectMapper.writeValueAsString(signal); - return cache.replace(signal.identifier, signalJson, stringSerializer, stringSerializer, signal.revision); + if (signal.cachedEntry == null) { + signal.cachedEntry = new AtomicCacheEntry<>(signal.identifier, signalJson, null); + } else { + signal.cachedEntry.setValue(signalJson); + } + return cache.replace(signal.cachedEntry, stringSerializer, stringSerializer); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index 2c5dbc12a0..3e0cd689d6 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -17,10 +17,10 @@ package org.apache.nifi.processors.standard; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; -import org.apache.nifi.distributed.cache.client.StandardCacheEntry; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; @@ -310,8 +310,8 @@ public class TestNotify { } - static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { - private final ConcurrentMap values = new ConcurrentHashMap<>(); + static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { + private final ConcurrentMap> values = new ConcurrentHashMap<>(); private boolean failOnCalls = false; void setFailOnCalls(boolean failOnCalls){ @@ -359,7 +359,7 @@ public class TestNotify { public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { verifyNotFail(); - final CacheEntry entry = values.get(key); + final AtomicCacheEntry entry = values.get(key); if (entry == null) { return null; } @@ -397,22 +397,23 @@ public class TestNotify { @Override @SuppressWarnings("unchecked") - public CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { + public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { verifyNotFail(); - return values.get(key); + return (AtomicCacheEntry) values.get(key); } @Override - public boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException { + public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { verifyNotFail(); - final CacheEntry existing = values.get(key); - if (existing != null && existing.getRevision() != revision) { + final K key = entry.getKey(); + final AtomicCacheEntry existing = values.get(key); + if (existing != null && !existing.getRevision().equals(entry.getRevision())) { return false; } - values.put(key, new StandardCacheEntry<>(key, value, revision + 1)); + values.put(key, new AtomicCacheEntry<>(key, entry.getValue(), entry.getRevision().orElse(0L) + 1)); return true; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java index 13b4346859..e3f982c8e1 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java @@ -17,9 +17,8 @@ package org.apache.nifi.processors.standard; import org.apache.activemq.util.ByteArrayOutputStream; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; -import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient.CacheEntry; -import org.apache.nifi.distributed.cache.client.StandardCacheEntry; import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.processors.standard.WaitNotifyProtocol.Signal; import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer; @@ -46,33 +45,29 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; public class TestWaitNotifyProtocol { - private final Map> cacheEntries = new HashMap<>(); + private final Map> cacheEntries = new HashMap<>(); - private AtomicDistributedMapCacheClient cache; + private AtomicDistributedMapCacheClient cache; + @SuppressWarnings("unchecked") private final Answer successfulReplace = invocation -> { - final String key = invocation.getArgumentAt(0, String.class); - final String value = invocation.getArgumentAt(1, String.class); - final Long revision = invocation.getArgumentAt(4, Long.class); - cacheEntries.put(key, new StandardCacheEntry<>(key, value, revision + 1)); + final AtomicCacheEntry entry = invocation.getArgumentAt(0, AtomicCacheEntry.class); + cacheEntries.put(entry.getKey(), new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), entry.getRevision().orElse(0L) + 1)); return true; }; @Before + @SuppressWarnings("unchecked") public void before() throws Exception { cacheEntries.clear(); // Default mock implementations. cache = mock(AtomicDistributedMapCacheClient.class); - doAnswer(invocation -> { - final CacheEntry entry = cacheEntries.get(invocation.getArguments()[0]); - return entry; - }).when(cache).fetch(any(), any(), any()); + doAnswer(invocation -> cacheEntries.get(invocation.getArguments()[0])).when(cache).fetch(any(), any(), any()); } @Test @@ -80,7 +75,7 @@ public class TestWaitNotifyProtocol { // replace always return false. doAnswer(invocation -> false) - .when(cache).replace(any(), any(), any(), any(), anyLong()); + .when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -95,7 +90,7 @@ public class TestWaitNotifyProtocol { @Test public void testNotifyFirst() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -106,16 +101,16 @@ public class TestWaitNotifyProtocol { assertEquals(Long.valueOf(1), signal.getCounts().get("a")); assertTrue(cacheEntries.containsKey("signal-id")); - final CacheEntry cacheEntry = cacheEntries.get("signal-id"); + final AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(0, cacheEntry.getRevision()); + assertEquals(1, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @Test public void testNotifyCounters() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -124,21 +119,21 @@ public class TestWaitNotifyProtocol { protocol.notify(signalId, "a", 1, null); protocol.notify(signalId, "a", 1, null); - CacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(1, cacheEntry.getRevision()); + AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); + assertEquals(2, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "a", 10, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(2, cacheEntry.getRevision()); + assertEquals(3, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); protocol.notify(signalId, "b", 2, null); protocol.notify(signalId, "c", 3, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(4, cacheEntry.getRevision()); + assertEquals(5, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); final Map deltas = new HashMap<>(); @@ -147,20 +142,20 @@ public class TestWaitNotifyProtocol { protocol.notify("signal-id", deltas, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(5, cacheEntry.getRevision()); + assertEquals(6, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); // Zero clear 'b'. protocol.notify("signal-id", "b", 0, null); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(6, cacheEntry.getRevision()); + assertEquals(7, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", cacheEntry.getValue()); } @Test public void testNotifyAttributes() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -172,8 +167,8 @@ public class TestWaitNotifyProtocol { protocol.notify(signalId, "a", 1, attributeA1); - CacheEntry cacheEntry = cacheEntries.get("signal-id"); - assertEquals(0, cacheEntry.getRevision()); + AtomicCacheEntry cacheEntry = cacheEntries.get("signal-id"); + assertEquals(1L, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", cacheEntry.getValue()); final Map attributeA2 = new HashMap<>(); @@ -184,7 +179,7 @@ public class TestWaitNotifyProtocol { protocol.notify(signalId, "a", 1, attributeA2); cacheEntry = cacheEntries.get("signal-id"); - assertEquals(1, cacheEntry.getRevision()); + assertEquals(2L, cacheEntry.getRevision().orElse(-1L).longValue()); assertEquals("Updated attributes should be merged correctly", "{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", cacheEntry.getValue()); @@ -192,7 +187,7 @@ public class TestWaitNotifyProtocol { @Test public void testSignalCount() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); @@ -233,7 +228,7 @@ public class TestWaitNotifyProtocol { */ @Test public void testNiFiVersionUpgrade() throws Exception { - doAnswer(successfulReplace).when(cache).replace(any(), any(), any(), any(), anyLong()); + doAnswer(successfulReplace).when(cache).replace(any(), any(), any()); // Simulate old cache entry. final FlowFileAttributesSerializer attributesSerializer = new FlowFileAttributesSerializer(); @@ -245,7 +240,7 @@ public class TestWaitNotifyProtocol { attributesSerializer.serialize(cachedAttributes, bos); final String signalId = "old-entry"; - cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0)); + cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, new String(bos.toByteArray(), StandardCharsets.UTF_8), 0L)); final WaitNotifyProtocol protocol = new WaitNotifyProtocol(cache); final Signal signal = protocol.getSignal(signalId); @@ -255,7 +250,7 @@ public class TestWaitNotifyProtocol { assertEquals("value2", signal.getAttributes().get("key2")); assertEquals("value3", signal.getAttributes().get("key3")); - cacheEntries.put(signalId, new StandardCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0)); + cacheEntries.put(signalId, new AtomicCacheEntry<>(signalId, "UNSUPPORTED_FORMAT", 0L)); try { protocol.getSignal(signalId); fail("Should fail since cached value was not in expected format."); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java similarity index 64% rename from nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java rename to nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java index b4949d50f3..42d03dd0b1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicCacheEntry.java @@ -16,31 +16,43 @@ */ package org.apache.nifi.distributed.cache.client; -public class StandardCacheEntry implements AtomicDistributedMapCacheClient.CacheEntry { +import java.util.Optional; + +public class AtomicCacheEntry { private final K key; - private final V value; - private final long revision; + private V value; + private final R revision; - - public StandardCacheEntry(final K key, final V value, final long revision) { + /** + * Create new cache entry. + * @param key cache key + * @param value cache value + * @param revision cache revision, can be null with a brand new entry + */ + public AtomicCacheEntry(final K key, final V value, final R revision) { this.key = key; this.value = value; this.revision = revision; } - @Override - public long getRevision() { - return revision; + /** + * @return the latest revision stored in a cache server + */ + public Optional getRevision() { + return Optional.ofNullable(revision); } - @Override public K getKey() { return key; } - @Override public V getValue() { return value; } + + public void setValue(V value) { + this.value = value; + } + } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java index d0b77e1165..080d666fa1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java @@ -29,21 +29,14 @@ import java.io.IOException; * this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2. * *

If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException. + * @param The revision type. + * If the underlying cache storage supports the concept of revision to implement optimistic locking, then a client implementation should use that. + * Otherwise set the cached value and check if the key is not updated at {@link #replace(AtomicCacheEntry, Serializer, Serializer)} */ @Tags({"distributed", "client", "cluster", "map", "cache"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows " + "multiple nodes to coordinate state with a single remote entity.") -public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient { - - interface CacheEntry { - - long getRevision(); - - K getKey(); - - V getValue(); - - } +public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient { /** * Fetch a CacheEntry with a key. @@ -55,22 +48,20 @@ public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClie * @return A CacheEntry instance if one exists, otherwise null. * @throws IOException if unable to communicate with the remote instance */ - CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException; + AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException; /** * Replace an existing key with new value. * @param the key type * @param the value type - * @param key the key to replace - * @param value the new value for the key + * @param entry should provide the new value for {@link AtomicCacheEntry#getValue()}, + * and the same revision in the cache storage for {@link AtomicCacheEntry#getRevision()}, + * if the revision does not match with the one in the cache storage, value will not be replaced. * @param keySerializer key serializer * @param valueSerializer value serializer - * @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client, - * this doesn't match with the one on server, therefore the replace operation will not be performed. - * If there's no existing entry for the key, any revision can replace the key. * @return true only if the key is replaced. * @throws IOException if unable to communicate with the remote instance */ - boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException; + boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException; } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index f197bace70..9651c26cf4 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -49,7 +49,7 @@ import org.slf4j.LoggerFactory; @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " + "between nodes in a NiFi cluster") -public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { +public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); @@ -237,7 +237,8 @@ public class DistributedMapCacheClientService extends AbstractControllerService } @Override - public CacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + @SuppressWarnings("unchecked") + public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { return withCommsSession(session -> { validateProtocolVersion(session, 2); @@ -257,8 +258,7 @@ public class DistributedMapCacheClientService extends AbstractControllerService return null; } - final StandardCacheEntry standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision); - return standardCacheEntry; + return new AtomicCacheEntry(key, valueDeserializer.deserialize(responseBuffer), revision); }); } @@ -269,16 +269,16 @@ public class DistributedMapCacheClientService extends AbstractControllerService } @Override - public boolean replace(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final long revision) throws IOException { + public boolean replace(AtomicCacheEntry entry, Serializer keySerializer, Serializer valueSerializer) throws IOException { return withCommsSession(session -> { validateProtocolVersion(session, 2); final DataOutputStream dos = new DataOutputStream(session.getOutputStream()); dos.writeUTF("replace"); - serialize(key, keySerializer, dos); - dos.writeLong(revision); - serialize(value, valueSerializer, dos); + serialize(entry.getKey(), keySerializer, dos); + dos.writeLong(entry.getRevision().orElse(0L)); + serialize(entry.getValue(), valueSerializer, dos); dos.flush(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java index b08e69d2e9..34d9473045 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java @@ -34,7 +34,7 @@ import java.util.Map; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SystemUtils; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; @@ -587,18 +587,20 @@ public class TestServerAndClient { client1.put(key, "valueC1-0", stringSerializer, stringSerializer); // Client 1 and 2 fetch the key - AtomicDistributedMapCacheClient.CacheEntry c1 = client1.fetch(key, stringSerializer, stringDeserializer); - AtomicDistributedMapCacheClient.CacheEntry c2 = client2.fetch(key, stringSerializer, stringDeserializer); + AtomicCacheEntry c1 = client1.fetch(key, stringSerializer, stringDeserializer); + AtomicCacheEntry c2 = client2.fetch(key, stringSerializer, stringDeserializer); assertEquals(0, c1.getRevision()); assertEquals("valueC1-0", c1.getValue()); assertEquals(0, c2.getRevision()); assertEquals("valueC1-0", c2.getValue()); // Client 1 replace - boolean c1Result = client1.replace(key, "valueC1-1", stringSerializer, stringSerializer, c1.getRevision()); + c1.setValue("valueC1-1"); + boolean c1Result = client1.replace(c1, stringSerializer, stringSerializer); assertTrue("C1 should be able to replace the key", c1Result); // Client 2 replace with the old revision - boolean c2Result = client2.replace(key, "valueC2-1", stringSerializer, stringSerializer, c2.getRevision()); + c2.setValue("valueC2-1"); + boolean c2Result = client2.replace(c2, stringSerializer, stringSerializer); assertFalse("C2 shouldn't be able to replace the key", c2Result); // Client 2 fetch the key again @@ -607,7 +609,8 @@ public class TestServerAndClient { assertEquals(1, c2.getRevision()); // Now, Client 2 knows the correct revision so it can replace the key - c2Result = client2.replace(key, "valueC2-2", stringSerializer, stringSerializer, c2.getRevision()); + c2.setValue("valueC2-2"); + c2Result = client2.replace(c2, stringSerializer, stringSerializer); assertTrue("C2 should be able to replace the key", c2Result); // Assert the cache @@ -678,7 +681,8 @@ public class TestServerAndClient { } try { - client.replace(key, "value2", stringSerializer, stringSerializer, 0L); + AtomicCacheEntry entry = new AtomicCacheEntry<>(key, "value2", 0L); + client.replace(entry, stringSerializer, stringSerializer); fail("Version 2 operations should NOT work."); } catch (UnsupportedOperationException e) { }