diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java
new file mode 100644
index 0000000000..d0b77e1165
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/AtomicDistributedMapCacheClient.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+
+import java.io.IOException;
+
+/**
+ *
This interface defines an API that can be used for interacting with a
+ * Distributed Cache that functions similarly to a {@link java.util.Map Map}.
+ *
+ *
In addition to the API defined in {@link DistributedMapCacheClient} super class,
+ * this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2.
+ *
+ *
If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException.
+ */
+@Tags({"distributed", "client", "cluster", "map", "cache"})
+@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows "
+ + "multiple nodes to coordinate state with a single remote entity.")
+public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient {
+
+ interface CacheEntry {
+
+ long getRevision();
+
+ K getKey();
+
+ V getValue();
+
+ }
+
+ /**
+ * Fetch a CacheEntry with a key.
+ * @param the key type
+ * @param the value type
+ * @param key the key to lookup in the map
+ * @param keySerializer key serializer
+ * @param valueDeserializer value deserializer
+ * @return A CacheEntry instance if one exists, otherwise null.
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ CacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException;
+
+ /**
+ * Replace an existing key with new value.
+ * @param the key type
+ * @param the value type
+ * @param key the key to replace
+ * @param value the new value for the key
+ * @param keySerializer key serializer
+ * @param valueSerializer value serializer
+ * @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client,
+ * this doesn't match with the one on server, therefore the replace operation will not be performed.
+ * If there's no existing entry for the key, any revision can replace the key.
+ * @return true only if the key is replaced.
+ * @throws IOException if unable to communicate with the remote instance
+ */
+ boolean replace(K key, V value, Serializer keySerializer, Serializer valueSerializer, long revision) throws IOException;
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
index c035485b31..83b3b909cd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
@@ -43,4 +43,8 @@ public interface CommsSession extends Closeable {
long getTimeout(TimeUnit timeUnit);
SSLContext getSSLContext();
+
+ int getProtocolVersion();
+
+ void setProtocolVersion(final int protocolVersion);
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index e78ae0b1f9..5379bc1b20 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -16,7 +16,9 @@
*/
package org.apache.nifi.distributed.cache.client;
+import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -39,8 +41,6 @@ import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
+ "between nodes in a NiFi cluster")
-public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
+public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient {
private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
@@ -216,6 +216,58 @@ public class DistributedMapCacheClientService extends AbstractControllerService
});
}
+ @Override
+ public CacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException {
+ return withCommsSession(session -> {
+ validateProtocolVersion(session, 2);
+
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("fetch");
+
+ serialize(key, keySerializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ final long revision = dis.readLong();
+ final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+
+ if (revision < 0) {
+ // This indicates that key was not found.
+ return null;
+ }
+
+ final StandardCacheEntry standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision);
+ return standardCacheEntry;
+ });
+ }
+
+ private void validateProtocolVersion(final CommsSession session, final int requiredProtocolVersion) {
+ if (session.getProtocolVersion() < requiredProtocolVersion) {
+ throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + requiredProtocolVersion);
+ }
+ }
+
+ @Override
+ public boolean replace(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final long revision) throws IOException {
+ return withCommsSession(session -> {
+ validateProtocolVersion(session, 2);
+
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("replace");
+
+ serialize(key, keySerializer, dos);
+ dos.writeLong(revision);
+ serialize(value, valueSerializer, dos);
+
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ });
+ }
+
private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
final int responseLength = dis.readInt();
final byte[] responseBuffer = new byte[responseLength];
@@ -247,9 +299,10 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}
session = createCommsSession(configContext);
- final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+ session.setProtocolVersion(versionNegotiator.getVersion());
} catch (final HandshakeException e) {
try {
session.close();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
index 82ab643f8f..c1fa274eb2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
@@ -127,6 +127,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+ session.setProtocolVersion(versionNegotiator.getVersion());
} catch (final HandshakeException e) {
IOUtils.closeQuietly(session);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
index 3d400bb123..7576c5f4e4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
@@ -42,6 +42,8 @@ public class SSLCommsSession implements CommsSession {
private final SSLSocketChannelOutputStream out;
private final BufferedOutputStream bufferedOut;
+ private int protocolVersion;
+
public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
@@ -106,4 +108,13 @@ public class SSLCommsSession implements CommsSession {
return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
}
+ @Override
+ public int getProtocolVersion() {
+ return protocolVersion;
+ }
+
+ @Override
+ public void setProtocolVersion(final int protocolVersion) {
+ this.protocolVersion = protocolVersion;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java
new file mode 100644
index 0000000000..b4949d50f3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCacheEntry.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+public class StandardCacheEntry implements AtomicDistributedMapCacheClient.CacheEntry {
+
+ private final K key;
+ private final V value;
+ private final long revision;
+
+
+ public StandardCacheEntry(final K key, final V value, final long revision) {
+ this.key = key;
+ this.value = value;
+ this.revision = revision;
+ }
+
+ @Override
+ public long getRevision() {
+ return revision;
+ }
+
+ @Override
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public V getValue() {
+ return value;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
index b2a5c1d338..6a8ee45a3d 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
@@ -45,6 +45,8 @@ public class StandardCommsSession implements CommsSession {
private final SocketChannelOutputStream out;
private final InterruptableOutputStream bufferedOut;
+ private int protocolVersion;
+
public StandardCommsSession(final String hostname, final int port) throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
socketChannel.configureBlocking(false);
@@ -122,4 +124,15 @@ public class StandardCommsSession implements CommsSession {
public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
}
+
+ @Override
+ public int getProtocolVersion() {
+ return protocolVersion;
+ }
+
+ @Override
+ public void setProtocolVersion(final int protocolVersion) {
+ this.protocolVersion = protocolVersion;
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
index f36ac15066..3df2f094bf 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
@@ -33,6 +33,19 @@ public class ProtocolHandshake {
public static final int DIFFERENT_RESOURCE_VERSION = 21;
public static final int ABORT = 255;
+ /**
+ * Initiate handshake to ensure client and server can communicate with the same protocol.
+ * If the server doesn't support requested protocol version, HandshakeException will be thrown.
+ *
+ * DistributedMapCache version histories:
+ * - 2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.
+ * - 1: Initial version.
+ *
+ *
+ * DistributedSetCache version histories:
+ * - 1: Initial version.
+ *
+ */
public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out);
@@ -85,6 +98,7 @@ public class ProtocolHandshake {
// Attempt negotiation of resource based on our new preferred version.
initiateVersionNegotiation(negotiator, dis, dos);
+ return;
case ABORT:
throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default:
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index 5c5a9cb3c9..1968162a7f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.distributed.cache.server;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,8 +32,6 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
@@ -121,9 +121,9 @@ public abstract class AbstractCacheServer implements CacheServer {
return;
}
try (final InputStream in = new BufferedInputStream(rawInputStream);
- final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
+ final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
- final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+ final VersionNegotiator versionNegotiator = getVersionNegotiator();
ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
@@ -163,6 +163,14 @@ public abstract class AbstractCacheServer implements CacheServer {
thread.start();
}
+ /**
+ * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
+ * for details of each version enhancements.
+ */
+ protected StandardVersionNegotiator getVersionNegotiator() {
+ return new StandardVersionNegotiator(1);
+ }
+
@Override
public void stop() throws IOException {
stopped = true;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
index dce7ccd488..6b8515593b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map;
import java.io.File;
+import java.io.IOException;
import javax.net.ssl.SSLContext;
@@ -69,10 +70,14 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
- return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+ return createMapCacheServer(port, maxSize, sslContext, evictionPolicy, persistenceDir);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
+ protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
+ return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
+ }
+
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
index e9c6f1dfb3..67f5babb5b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCache.java
@@ -31,5 +31,9 @@ public interface MapCache {
ByteBuffer remove(ByteBuffer key) throws IOException;
+ MapCacheRecord fetch(ByteBuffer key) throws IOException;
+
+ MapPutResult replace(MapCacheRecord record) throws IOException;
+
void shutdown() throws IOException;
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
index ff032b19a7..84af198d06 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheRecord.java
@@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import org.apache.nifi.distributed.cache.server.CacheRecord;
@@ -24,10 +25,19 @@ public class MapCacheRecord extends CacheRecord {
private final ByteBuffer key;
private final ByteBuffer value;
+ /**
+ * Revision is a number that increases every time the key is updated.
+ */
+ private final long revision;
public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
+ this(key, value, -1L);
+ }
+
+ public MapCacheRecord(final ByteBuffer key, final ByteBuffer value, final long revision) {
this.key = key;
this.value = value;
+ this.revision = revision;
}
public ByteBuffer getKey() {
@@ -40,7 +50,7 @@ public class MapCacheRecord extends CacheRecord {
@Override
public int hashCode() {
- return 2938476 + key.hashCode() * value.hashCode();
+ return Arrays.hashCode(new Object[]{key, value, revision});
}
@Override
@@ -51,9 +61,13 @@ public class MapCacheRecord extends CacheRecord {
if (obj instanceof MapCacheRecord) {
final MapCacheRecord that = ((MapCacheRecord) obj);
- return key.equals(that.key) && value.equals(that.value);
+ return key.equals(that.key) && value.equals(that.value) && revision == that.revision;
}
return false;
}
+
+ public long getRevision() {
+ return revision;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 13ed0df1ca..99eacd77b3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -27,7 +28,8 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
-import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
public class MapCacheServer extends AbstractCacheServer {
@@ -48,6 +50,14 @@ public class MapCacheServer extends AbstractCacheServer {
}
}
+ /**
+ * Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
+ * for details of each version enhancements.
+ */
+ protected StandardVersionNegotiator getVersionNegotiator() {
+ return new StandardVersionNegotiator(2, 1);
+ }
+
@Override
protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
final DataInputStream dis = new DataInputStream(in);
@@ -88,7 +98,7 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeInt(0);
} else {
// we didn't put. Write back the previous value
- final byte[] byteArray = putResult.getExistingValue().array();
+ final byte[] byteArray = putResult.getExisting().getValue().array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
@@ -99,10 +109,10 @@ public class MapCacheServer extends AbstractCacheServer {
final byte[] key = readValue(dis);
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
if (existingValue == null) {
- // there was no existing value; we did a "put".
+ // there was no existing value.
dos.writeInt(0);
} else {
- // a value already existed. we did not update the map
+ // a value already existed.
final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
@@ -116,6 +126,31 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeBoolean(removed);
break;
}
+ case "fetch": {
+ final byte[] key = readValue(dis);
+ final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
+ if (existing == null) {
+ // there was no existing value.
+ dos.writeLong(-1);
+ dos.writeInt(0);
+ } else {
+ // a value already existed.
+ dos.writeLong(existing.getRevision());
+ final byte[] byteArray = existing.getValue().array();
+ dos.writeInt(byteArray.length);
+ dos.write(byteArray);
+ }
+
+ break;
+ }
+ case "replace": {
+ final byte[] key = readValue(dis);
+ final long revision = dis.readLong();
+ final byte[] value = readValue(dis);
+ final MapPutResult result = cache.replace(new MapCacheRecord(ByteBuffer.wrap(key), ByteBuffer.wrap(value), revision));
+ dos.writeBoolean(result.isSuccessful());
+ break;
+ }
default: {
throw new IOException("Illegal Request");
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
index d0055f3ab6..3750aee490 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapPutResult.java
@@ -16,45 +16,33 @@
*/
package org.apache.nifi.distributed.cache.server.map;
-import java.nio.ByteBuffer;
-
public class MapPutResult {
private final boolean successful;
- private final ByteBuffer key, value;
- private final ByteBuffer existingValue;
- private final ByteBuffer evictedKey, evictedValue;
+ private final MapCacheRecord record;
+ private final MapCacheRecord existing;
+ private final MapCacheRecord evicted;
- public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) {
+ public MapPutResult(boolean successful, MapCacheRecord record, MapCacheRecord existing, MapCacheRecord evicted) {
this.successful = successful;
- this.key = key;
- this.value = value;
- this.existingValue = existingValue;
- this.evictedKey = evictedKey;
- this.evictedValue = evictedValue;
+ this.record = record;
+ this.existing = existing;
+ this.evicted = evicted;
}
public boolean isSuccessful() {
return successful;
}
- public ByteBuffer getKey() {
- return key;
+ public MapCacheRecord getRecord() {
+ return record;
}
- public ByteBuffer getValue() {
- return value;
+ public MapCacheRecord getExisting() {
+ return existing;
}
- public ByteBuffer getExistingValue() {
- return existingValue;
- }
-
- public ByteBuffer getEvictedKey() {
- return evictedKey;
- }
-
- public ByteBuffer getEvictedValue() {
- return evictedValue;
+ public MapCacheRecord getEvicted() {
+ return evicted;
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
index c2fc0d7600..62deae584d 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java
@@ -57,38 +57,27 @@ public class PersistentMapCache implements MapCache {
@Override
public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
final MapPutResult putResult = wrapped.putIfAbsent(key, value);
- if (putResult.isSuccessful()) {
- // The put was successful.
- final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
- final List records = new ArrayList<>();
- records.add(record);
-
- if (putResult.getEvictedKey() != null) {
- records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
- }
-
- wali.update(Collections.singletonList(record), false);
-
- final long modCount = modifications.getAndIncrement();
- if (modCount > 0 && modCount % 100000 == 0) {
- wali.checkpoint();
- }
- }
-
+ putWriteAheadLog(key, value, putResult);
return putResult;
}
@Override
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
final MapPutResult putResult = wrapped.put(key, value);
+ putWriteAheadLog(key, value, putResult);
+ return putResult;
+ }
+
+ protected void putWriteAheadLog(ByteBuffer key, ByteBuffer value, MapPutResult putResult) throws IOException {
if ( putResult.isSuccessful() ) {
// The put was successful.
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
final List records = new ArrayList<>();
records.add(record);
- if ( putResult.getEvictedKey() != null ) {
- records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
+ final MapCacheRecord evicted = putResult.getEvicted();
+ if ( evicted != null ) {
+ records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue()));
}
wali.update(Collections.singletonList(record), false);
@@ -98,8 +87,6 @@ public class PersistentMapCache implements MapCache {
wali.checkpoint();
}
}
-
- return putResult;
}
@Override
@@ -112,6 +99,18 @@ public class PersistentMapCache implements MapCache {
return wrapped.get(key);
}
+ @Override
+ public MapCacheRecord fetch(ByteBuffer key) throws IOException {
+ return wrapped.fetch(key);
+ }
+
+ @Override
+ public MapPutResult replace(MapCacheRecord record) throws IOException {
+ final MapPutResult putResult = wrapped.replace(record);
+ putWriteAheadLog(record.getKey(), record.getValue(), putResult);
+ return putResult;
+ }
+
@Override
public ByteBuffer remove(final ByteBuffer key) throws IOException {
final ByteBuffer removeResult = wrapped.remove(key);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
index b167c62e65..ebcf91ac46 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java
@@ -84,16 +84,7 @@ public class SimpleMapCache implements MapCache {
final MapCacheRecord record = cache.get(key);
if (record == null) {
// Record is null. We will add.
- final MapCacheRecord evicted = evict();
- final MapCacheRecord newRecord = new MapCacheRecord(key, value);
- cache.put(key, newRecord);
- inverseCacheMap.put(newRecord, key);
-
- if (evicted == null) {
- return new MapPutResult(true, key, value, null, null, null);
- } else {
- return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
- }
+ return put(key, value, record);
}
// Record is not null. Increment hit count and return result indicating that record was not added.
@@ -101,29 +92,37 @@ public class SimpleMapCache implements MapCache {
record.hit();
inverseCacheMap.put(record, key);
- return new MapPutResult(false, key, value, record.getValue(), null, null);
+ return new MapPutResult(false, record, record, null);
} finally {
writeLock.unlock();
}
}
+ private MapPutResult put(final ByteBuffer key, final ByteBuffer value, final MapCacheRecord existing) {
+ // evict if we need to in order to make room for a new entry.
+ final MapCacheRecord evicted = evict();
+
+ final long revision;
+ if (existing == null) {
+ revision = 0;
+ } else {
+ revision = existing.getRevision() + 1;
+ inverseCacheMap.remove(existing);
+ }
+
+ final MapCacheRecord record = new MapCacheRecord(key, value, revision);
+ cache.put(key, record);
+ inverseCacheMap.put(record, key);
+
+ return new MapPutResult(true, record, existing, evicted);
+ }
@Override
- public MapPutResult put(final ByteBuffer key, final ByteBuffer value) {
+ public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
writeLock.lock();
try {
- // evict if we need to in order to make room for a new entry.
- final MapCacheRecord evicted = evict();
-
- final MapCacheRecord record = new MapCacheRecord(key, value);
- final MapCacheRecord existing = cache.put(key, record);
- inverseCacheMap.put(record, key);
-
- final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
- final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
- final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
-
- return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
+ final MapCacheRecord existing = cache.get(key);
+ return put(key, value, existing);
} finally {
writeLock.unlock();
}
@@ -182,6 +181,43 @@ public class SimpleMapCache implements MapCache {
}
}
+ @Override
+ public MapCacheRecord fetch(ByteBuffer key) throws IOException {
+ readLock.lock();
+ try {
+ final MapCacheRecord record = cache.get(key);
+ if (record == null) {
+ return null;
+ }
+
+ inverseCacheMap.remove(record);
+ record.hit();
+ inverseCacheMap.put(record, key);
+
+ return record;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public MapPutResult replace(MapCacheRecord inputRecord) throws IOException {
+ writeLock.lock();
+ try {
+ final ByteBuffer key = inputRecord.getKey();
+ final ByteBuffer value = inputRecord.getValue();
+ final MapCacheRecord existing = fetch(key);
+ if (existing != null && inputRecord.getRevision() != existing.getRevision()) {
+ // The key has been updated by other operation.
+ return new MapPutResult(false, inputRecord, existing, null);
+ }
+
+ return put(key, value, existing);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
@Override
public void shutdown() throws IOException {
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index 82e4a9912d..c8f94788fb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -33,14 +34,17 @@ import java.util.Map;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
+import org.apache.nifi.distributed.cache.server.map.MapCacheServer;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceInitializationContext;
@@ -52,6 +56,8 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
+
public class TestServerAndClient {
private static Logger LOGGER;
@@ -450,6 +456,157 @@ public class TestServerAndClient {
server.shutdownServer();
}
+ @Test
+ public void testOptimisticLock() throws Exception {
+
+ /**
+ * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
+ * See: https://issues.apache.org/jira/browse/NIFI-437
+ */
+ Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+ SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
+ LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
+
+ // Create server
+ final DistributedMapCacheServer server = new MapServer();
+ final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+ runner.addControllerService("server", server);
+ runner.enableControllerService(server);
+
+ DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
+ client1.initialize(clientInitContext1);
+
+ DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
+ client1.initialize(clientInitContext2);
+
+ final Map clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+ clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+ MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+ client1.cacheConfig(clientContext1);
+ MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
+ client2.cacheConfig(clientContext2);
+
+ final Serializer stringSerializer = new StringSerializer();
+ final Deserializer stringDeserializer = new StringDeserializer();
+
+ final String key = "test-optimistic-lock";
+
+ // Ensure there's no existing key
+ assertFalse(client1.containsKey(key, stringSerializer));
+ assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
+
+ // Client 1 inserts the key.
+ client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
+
+ // Client 1 and 2 fetch the key
+ AtomicDistributedMapCacheClient.CacheEntry c1 = client1.fetch(key, stringSerializer, stringDeserializer);
+ AtomicDistributedMapCacheClient.CacheEntry c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+ assertEquals(0, c1.getRevision());
+ assertEquals("valueC1-0", c1.getValue());
+ assertEquals(0, c2.getRevision());
+ assertEquals("valueC1-0", c2.getValue());
+
+ // Client 1 replace
+ boolean c1Result = client1.replace(key, "valueC1-1", stringSerializer, stringSerializer, c1.getRevision());
+ assertTrue("C1 should be able to replace the key", c1Result);
+ // Client 2 replace with the old revision
+ boolean c2Result = client2.replace(key, "valueC2-1", stringSerializer, stringSerializer, c2.getRevision());
+ assertFalse("C2 shouldn't be able to replace the key", c2Result);
+
+ // Client 2 fetch the key again
+ c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+ assertEquals("valueC1-1", c2.getValue());
+ assertEquals(1, c2.getRevision());
+
+ // Now, Client 2 knows the correct revision so it can replace the key
+ c2Result = client2.replace(key, "valueC2-2", stringSerializer, stringSerializer, c2.getRevision());
+ assertTrue("C2 should be able to replace the key", c2Result);
+
+ // Assert the cache
+ c2 = client2.fetch(key, stringSerializer, stringDeserializer);
+ assertEquals("valueC2-2", c2.getValue());
+ assertEquals(2, c2.getRevision());
+
+ client1.close();
+ client2.close();
+ server.shutdownServer();
+ }
+
+ @Test
+ public void testBackwardCompatibility() throws Exception {
+
+ /**
+ * This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
+ * See: https://issues.apache.org/jira/browse/NIFI-437
+ */
+ Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
+ SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
+
+ LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
+
+ final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
+
+ // Create a server that only supports protocol version 1.
+ final DistributedMapCacheServer server = new MapServer() {
+ @Override
+ protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
+ return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir) {
+ @Override
+ protected StandardVersionNegotiator getVersionNegotiator() {
+ return new StandardVersionNegotiator(1);
+ }
+ };
+ }
+ };
+ runner.addControllerService("server", server);
+ runner.enableControllerService(server);
+
+ DistributedMapCacheClientService client = new DistributedMapCacheClientService();
+ MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
+ client.initialize(clientInitContext1);
+
+ final Map clientProperties = new HashMap<>();
+ clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
+ clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
+ clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
+
+ MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
+ client.cacheConfig(clientContext);
+
+ final Serializer stringSerializer = new StringSerializer();
+ final Deserializer stringDeserializer = new StringDeserializer();
+
+ final String key = "test-backward-compatibility";
+
+ // Version 1 operations should work
+ client.put(key, "value1", stringSerializer, stringSerializer);
+ assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
+
+ assertTrue(client.containsKey(key, stringSerializer));
+
+ try {
+ client.fetch(key, stringSerializer, stringDeserializer);
+ fail("Version 2 operations should NOT work.");
+ } catch (UnsupportedOperationException e) {
+ }
+
+ try {
+ client.replace(key, "value2", stringSerializer, stringSerializer, 0L);
+ fail("Version 2 operations should NOT work.");
+ } catch (UnsupportedOperationException e) {
+ }
+
+ client.close();
+ server.shutdownServer();
+ }
+
+
private void waitABit() {
try {
Thread.sleep(10L);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
new file mode 100644
index 0000000000..2e19714c10
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestSimpleMapCache.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.server.map;
+
+import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestSimpleMapCache {
+
+ @Test
+ public void testBasicOperations() throws Exception {
+ final SimpleMapCache cache = new SimpleMapCache("service-id", 2, EvictionPolicy.FIFO);
+
+ final ByteBuffer key1 = ByteBuffer.wrap("key1".getBytes());
+ final ByteBuffer key2 = ByteBuffer.wrap("key2".getBytes());
+ final ByteBuffer key3 = ByteBuffer.wrap("key3".getBytes());
+ ByteBuffer value1 = ByteBuffer.wrap("value1-0".getBytes());
+ ByteBuffer value2 = ByteBuffer.wrap("value2-0".getBytes());
+ ByteBuffer value3 = ByteBuffer.wrap("value3-0".getBytes());
+
+ // Initial state.
+ assertNull(cache.get(key1));
+ assertNull(cache.fetch(key1));
+
+ // Put the 1st key.
+ MapPutResult putResult = cache.put(key1, value1);
+ assertTrue(putResult.isSuccessful());
+ assertNull(putResult.getExisting());
+ assertNull(putResult.getEvicted());
+ assertEquals(0, putResult.getRecord().getRevision());
+
+ // Update the same key.
+ value1 = ByteBuffer.wrap("value1-1".getBytes());
+ putResult = cache.put(key1, value1);
+ assertTrue(putResult.isSuccessful());
+ assertNotNull(putResult.getExisting());
+ assertEquals(1, putResult.getRecord().getRevision());
+ assertEquals(key1, putResult.getExisting().getKey());
+ assertEquals("value1-0", new String(putResult.getExisting().getValue().array()));
+ assertNull(putResult.getEvicted());
+
+ // Put the 2nd key.
+ putResult = cache.put(key2, value2);
+ assertTrue(putResult.isSuccessful());
+ assertNull(putResult.getExisting());
+ assertNull(putResult.getEvicted());
+ assertEquals(0, putResult.getRecord().getRevision());
+
+ // Put the 3rd key.
+ putResult = cache.put(key3, value3);
+ assertTrue(putResult.isSuccessful());
+ assertNull(putResult.getExisting());
+ assertNotNull("The first key should be evicted", putResult.getEvicted());
+ assertEquals("key1", new String(putResult.getEvicted().getKey().array()));
+ assertEquals("value1-1", new String(putResult.getEvicted().getValue().array()));
+ assertEquals(0, putResult.getRecord().getRevision());
+
+ // Delete 2nd key.
+ ByteBuffer removed = cache.remove(key2);
+ assertNotNull(removed);
+ assertEquals("value2-0", new String(removed.array()));
+
+ // Put the 2nd key again.
+ putResult = cache.put(key2, value2);
+ assertTrue(putResult.isSuccessful());
+ assertNull(putResult.getExisting());
+ assertNull(putResult.getEvicted());
+ assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision());
+
+ }
+
+ @Test
+ public void testOptimisticLock() throws Exception {
+
+ final SimpleMapCache cache = new SimpleMapCache("service-id", 2, EvictionPolicy.FIFO);
+
+ final ByteBuffer key = ByteBuffer.wrap("key1".getBytes());
+ ByteBuffer valueC1 = ByteBuffer.wrap("valueC1-0".getBytes());
+ ByteBuffer valueC2 = ByteBuffer.wrap("valueC2-0".getBytes());
+
+ assertNull("If there's no existing key, fetch should return null.", cache.fetch(key));
+
+ // Client 1 inserts the key.
+ MapCacheRecord c1 = new MapCacheRecord(key, valueC1);
+ MapPutResult putResult = cache.replace(c1);
+ assertTrue("Replace should succeed if there's no existing key.", putResult.isSuccessful());
+
+ MapCacheRecord c2 = new MapCacheRecord(key, valueC2);
+ putResult = cache.replace(c2);
+ assertFalse("Replace should fail.", putResult.isSuccessful());
+
+ // Client 1 and 2 fetch the key
+ c1 = cache.fetch(key);
+ c2 = cache.fetch(key);
+ assertEquals(0, c1.getRevision());
+ assertEquals(0, c2.getRevision());
+
+ // Client 1 replace
+ valueC1 = ByteBuffer.wrap("valueC1-1".getBytes());
+ putResult = cache.replace(new MapCacheRecord(key, valueC1, c1.getRevision()));
+ assertTrue("Replace should succeed since revision matched.", putResult.isSuccessful());
+ assertEquals(1, putResult.getRecord().getRevision());
+
+ // Client 2 replace with the old revision
+ valueC2 = ByteBuffer.wrap("valueC2-1".getBytes());
+ putResult = cache.replace(new MapCacheRecord(key, valueC2, c2.getRevision()));
+ assertFalse("Replace should fail.", putResult.isSuccessful());
+ }
+
+}