NIFI-3214: Added fetch and replace to DistributedMapCache

- Using fetch and replace together can provide optimistic locking for
  concurrency control.
- Added fetch to get cache entry with its meta data such as revision
  number.
- Added replace to update cache only if it has not been updated.
- Added Map Cache protocol version 2 for those new operations.
- Existing operations such as get or put can work with protocol version
  1.

This closes #1410.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Koji Kawamura 2017-01-11 15:57:40 +09:00 committed by Bryan Bende
parent 8c7539b20a
commit ef54a8ec69
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
18 changed files with 682 additions and 86 deletions

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.client;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import java.io.IOException;
/**
* <p>This interface defines an API that can be used for interacting with a
* Distributed Cache that functions similarly to a {@link java.util.Map Map}.
*
* <p>In addition to the API defined in {@link DistributedMapCacheClient} super class,
* this class provides methods for concurrent atomic updates those are added since Map Cache protocol version 2.
*
* <p>If a remote cache server doesn't support Map Cache protocol version 2, these methods throw UnsupportedOperationException.
*/
@Tags({"distributed", "client", "cluster", "map", "cache"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This allows "
+ "multiple nodes to coordinate state with a single remote entity.")
public interface AtomicDistributedMapCacheClient extends DistributedMapCacheClient {
interface CacheEntry<K, V> {
long getRevision();
K getKey();
V getValue();
}
/**
* Fetch a CacheEntry with a key.
* @param <K> the key type
* @param <V> the value type
* @param key the key to lookup in the map
* @param keySerializer key serializer
* @param valueDeserializer value deserializer
* @return A CacheEntry instance if one exists, otherwise <cod>null</cod>.
* @throws IOException if unable to communicate with the remote instance
*/
<K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException;
/**
* Replace an existing key with new value.
* @param <K> the key type
* @param <V> the value type
* @param key the key to replace
* @param value the new value for the key
* @param keySerializer key serializer
* @param valueSerializer value serializer
* @param revision a revision that was retrieved by a preceding fetch operation, if the key is already updated by other client,
* this doesn't match with the one on server, therefore the replace operation will not be performed.
* If there's no existing entry for the key, any revision can replace the key.
* @return true only if the key is replaced.
* @throws IOException if unable to communicate with the remote instance
*/
<K, V> boolean replace(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, long revision) throws IOException;
}

View File

@ -43,4 +43,8 @@ public interface CommsSession extends Closeable {
long getTimeout(TimeUnit timeUnit); long getTimeout(TimeUnit timeUnit);
SSLContext getSSLContext(); SSLContext getSSLContext();
int getProtocolVersion();
void setProtocolVersion(final int protocolVersion);
} }

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.nifi.distributed.cache.client; package org.apache.nifi.distributed.cache.client;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -39,8 +41,6 @@ import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth; import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
@SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"}) @SeeAlso(classNames = {"org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer", "org.apache.nifi.ssl.StandardSSLContextService"})
@CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map " @CapabilityDescription("Provides the ability to communicate with a DistributedMapCacheServer. This can be used in order to share a Map "
+ "between nodes in a NiFi cluster") + "between nodes in a NiFi cluster")
public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { public class DistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient {
private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class); private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
@ -216,6 +216,58 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}); });
} }
@Override
public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(session -> {
validateProtocolVersion(session, 2);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("fetch");
serialize(key, keySerializer, dos);
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
final long revision = dis.readLong();
final byte[] responseBuffer = readLengthDelimitedResponse(dis);
if (revision < 0) {
// This indicates that key was not found.
return null;
}
final StandardCacheEntry<K, V> standardCacheEntry = new StandardCacheEntry<>(key, valueDeserializer.deserialize(responseBuffer), revision);
return standardCacheEntry;
});
}
private void validateProtocolVersion(final CommsSession session, final int requiredProtocolVersion) {
if (session.getProtocolVersion() < requiredProtocolVersion) {
throw new UnsupportedOperationException("Remote cache server doesn't support protocol version " + requiredProtocolVersion);
}
}
@Override
public <K, V> boolean replace(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final long revision) throws IOException {
return withCommsSession(session -> {
validateProtocolVersion(session, 2);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("replace");
serialize(key, keySerializer, dos);
dos.writeLong(revision);
serialize(value, valueSerializer, dos);
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
return dis.readBoolean();
});
}
private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException { private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
final int responseLength = dis.readInt(); final int responseLength = dis.readInt();
final byte[] responseBuffer = new byte[responseLength]; final byte[] responseBuffer = new byte[responseLength];
@ -247,9 +299,10 @@ public class DistributedMapCacheClientService extends AbstractControllerService
} }
session = createCommsSession(configContext); session = createCommsSession(configContext);
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(2, 1);
try { try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
session.setProtocolVersion(versionNegotiator.getVersion());
} catch (final HandshakeException e) { } catch (final HandshakeException e) {
try { try {
session.close(); session.close();

View File

@ -127,6 +127,7 @@ public class DistributedSetCacheClientService extends AbstractControllerService
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
try { try {
ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator); ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
session.setProtocolVersion(versionNegotiator.getVersion());
} catch (final HandshakeException e) { } catch (final HandshakeException e) {
IOUtils.closeQuietly(session); IOUtils.closeQuietly(session);

View File

@ -42,6 +42,8 @@ public class SSLCommsSession implements CommsSession {
private final SSLSocketChannelOutputStream out; private final SSLSocketChannelOutputStream out;
private final BufferedOutputStream bufferedOut; private final BufferedOutputStream bufferedOut;
private int protocolVersion;
public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException { public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true); sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
@ -106,4 +108,13 @@ public class SSLCommsSession implements CommsSession {
return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS); return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
} }
@Override
public int getProtocolVersion() {
return protocolVersion;
}
@Override
public void setProtocolVersion(final int protocolVersion) {
this.protocolVersion = protocolVersion;
}
} }

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.client;
public class StandardCacheEntry<K,V> implements AtomicDistributedMapCacheClient.CacheEntry<K,V> {
private final K key;
private final V value;
private final long revision;
public StandardCacheEntry(final K key, final V value, final long revision) {
this.key = key;
this.value = value;
this.revision = revision;
}
@Override
public long getRevision() {
return revision;
}
@Override
public K getKey() {
return key;
}
@Override
public V getValue() {
return value;
}
}

View File

@ -45,6 +45,8 @@ public class StandardCommsSession implements CommsSession {
private final SocketChannelOutputStream out; private final SocketChannelOutputStream out;
private final InterruptableOutputStream bufferedOut; private final InterruptableOutputStream bufferedOut;
private int protocolVersion;
public StandardCommsSession(final String hostname, final int port) throws IOException { public StandardCommsSession(final String hostname, final int port) throws IOException {
socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
socketChannel.configureBlocking(false); socketChannel.configureBlocking(false);
@ -122,4 +124,15 @@ public class StandardCommsSession implements CommsSession {
public long getTimeout(final TimeUnit timeUnit) { public long getTimeout(final TimeUnit timeUnit) {
return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS); return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
} }
@Override
public int getProtocolVersion() {
return protocolVersion;
}
@Override
public void setProtocolVersion(final int protocolVersion) {
this.protocolVersion = protocolVersion;
}
} }

View File

@ -33,6 +33,19 @@ public class ProtocolHandshake {
public static final int DIFFERENT_RESOURCE_VERSION = 21; public static final int DIFFERENT_RESOURCE_VERSION = 21;
public static final int ABORT = 255; public static final int ABORT = 255;
/**
* <p>Initiate handshake to ensure client and server can communicate with the same protocol.
* If the server doesn't support requested protocol version, HandshakeException will be thrown.</p>
*
* <p>DistributedMapCache version histories:<ul>
* <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li>
* <li>1: Initial version.</li>
* </ul></p>
*
* <p>DistributedSetCache version histories:<ul>
* <li>1: Initial version.</li>
* </ul></p>
*/
public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException { public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
final DataInputStream dis = new DataInputStream(in); final DataInputStream dis = new DataInputStream(in);
final DataOutputStream dos = new DataOutputStream(out); final DataOutputStream dos = new DataOutputStream(out);
@ -85,6 +98,7 @@ public class ProtocolHandshake {
// Attempt negotiation of resource based on our new preferred version. // Attempt negotiation of resource based on our new preferred version.
initiateVersionNegotiation(negotiator, dis, dos); initiateVersionNegotiation(negotiator, dis, dos);
return;
case ABORT: case ABORT:
throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
default: default:

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.distributed.cache.server; package org.apache.nifi.distributed.cache.server;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -30,8 +32,6 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake; import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException; import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream; import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
@ -121,9 +121,9 @@ public abstract class AbstractCacheServer implements CacheServer {
return; return;
} }
try (final InputStream in = new BufferedInputStream(rawInputStream); try (final InputStream in = new BufferedInputStream(rawInputStream);
final OutputStream out = new BufferedOutputStream(rawOutputStream)) { final OutputStream out = new BufferedOutputStream(rawOutputStream)) {
final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1); final VersionNegotiator versionNegotiator = getVersionNegotiator();
ProtocolHandshake.receiveHandshake(in, out, versionNegotiator); ProtocolHandshake.receiveHandshake(in, out, versionNegotiator);
@ -163,6 +163,14 @@ public abstract class AbstractCacheServer implements CacheServer {
thread.start(); thread.start();
} }
/**
* Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(1);
}
@Override @Override
public void stop() throws IOException { public void stop() throws IOException {
stopped = true; stopped = true;

View File

@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map; package org.apache.nifi.distributed.cache.server.map;
import java.io.File; import java.io.File;
import java.io.IOException;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -69,10 +70,14 @@ public class DistributedMapCacheServer extends DistributedCacheServer {
try { try {
final File persistenceDir = persistencePath == null ? null : new File(persistencePath); final File persistenceDir = persistencePath == null ? null : new File(persistencePath);
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir); return createMapCacheServer(port, maxSize, sslContext, evictionPolicy, persistenceDir);
} catch (final Exception e) { } catch (final Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir);
}
} }

View File

@ -31,5 +31,9 @@ public interface MapCache {
ByteBuffer remove(ByteBuffer key) throws IOException; ByteBuffer remove(ByteBuffer key) throws IOException;
MapCacheRecord fetch(ByteBuffer key) throws IOException;
MapPutResult replace(MapCacheRecord record) throws IOException;
void shutdown() throws IOException; void shutdown() throws IOException;
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map; package org.apache.nifi.distributed.cache.server.map;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.nifi.distributed.cache.server.CacheRecord; import org.apache.nifi.distributed.cache.server.CacheRecord;
@ -24,10 +25,19 @@ public class MapCacheRecord extends CacheRecord {
private final ByteBuffer key; private final ByteBuffer key;
private final ByteBuffer value; private final ByteBuffer value;
/**
* Revision is a number that increases every time the key is updated.
*/
private final long revision;
public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) { public MapCacheRecord(final ByteBuffer key, final ByteBuffer value) {
this(key, value, -1L);
}
public MapCacheRecord(final ByteBuffer key, final ByteBuffer value, final long revision) {
this.key = key; this.key = key;
this.value = value; this.value = value;
this.revision = revision;
} }
public ByteBuffer getKey() { public ByteBuffer getKey() {
@ -40,7 +50,7 @@ public class MapCacheRecord extends CacheRecord {
@Override @Override
public int hashCode() { public int hashCode() {
return 2938476 + key.hashCode() * value.hashCode(); return Arrays.hashCode(new Object[]{key, value, revision});
} }
@Override @Override
@ -51,9 +61,13 @@ public class MapCacheRecord extends CacheRecord {
if (obj instanceof MapCacheRecord) { if (obj instanceof MapCacheRecord) {
final MapCacheRecord that = ((MapCacheRecord) obj); final MapCacheRecord that = ((MapCacheRecord) obj);
return key.equals(that.key) && value.equals(that.value); return key.equals(that.key) && value.equals(that.value) && revision == that.revision;
} }
return false; return false;
} }
public long getRevision() {
return revision;
}
} }

View File

@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server.map; package org.apache.nifi.distributed.cache.server.map;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -27,7 +28,8 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.distributed.cache.server.AbstractCacheServer; import org.apache.nifi.distributed.cache.server.AbstractCacheServer;
import org.apache.nifi.distributed.cache.server.EvictionPolicy; import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.apache.nifi.stream.io.DataOutputStream; import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
public class MapCacheServer extends AbstractCacheServer { public class MapCacheServer extends AbstractCacheServer {
@ -48,6 +50,14 @@ public class MapCacheServer extends AbstractCacheServer {
} }
} }
/**
* Refer {@link org.apache.nifi.distributed.cache.protocol.ProtocolHandshake#initiateHandshake(InputStream, OutputStream, VersionNegotiator)}
* for details of each version enhancements.
*/
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(2, 1);
}
@Override @Override
protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException { protected boolean listen(final InputStream in, final OutputStream out, final int version) throws IOException {
final DataInputStream dis = new DataInputStream(in); final DataInputStream dis = new DataInputStream(in);
@ -88,7 +98,7 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeInt(0); dos.writeInt(0);
} else { } else {
// we didn't put. Write back the previous value // we didn't put. Write back the previous value
final byte[] byteArray = putResult.getExistingValue().array(); final byte[] byteArray = putResult.getExisting().getValue().array();
dos.writeInt(byteArray.length); dos.writeInt(byteArray.length);
dos.write(byteArray); dos.write(byteArray);
} }
@ -99,10 +109,10 @@ public class MapCacheServer extends AbstractCacheServer {
final byte[] key = readValue(dis); final byte[] key = readValue(dis);
final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key)); final ByteBuffer existingValue = cache.get(ByteBuffer.wrap(key));
if (existingValue == null) { if (existingValue == null) {
// there was no existing value; we did a "put". // there was no existing value.
dos.writeInt(0); dos.writeInt(0);
} else { } else {
// a value already existed. we did not update the map // a value already existed.
final byte[] byteArray = existingValue.array(); final byte[] byteArray = existingValue.array();
dos.writeInt(byteArray.length); dos.writeInt(byteArray.length);
dos.write(byteArray); dos.write(byteArray);
@ -116,6 +126,31 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeBoolean(removed); dos.writeBoolean(removed);
break; break;
} }
case "fetch": {
final byte[] key = readValue(dis);
final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
if (existing == null) {
// there was no existing value.
dos.writeLong(-1);
dos.writeInt(0);
} else {
// a value already existed.
dos.writeLong(existing.getRevision());
final byte[] byteArray = existing.getValue().array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "replace": {
final byte[] key = readValue(dis);
final long revision = dis.readLong();
final byte[] value = readValue(dis);
final MapPutResult result = cache.replace(new MapCacheRecord(ByteBuffer.wrap(key), ByteBuffer.wrap(value), revision));
dos.writeBoolean(result.isSuccessful());
break;
}
default: { default: {
throw new IOException("Illegal Request"); throw new IOException("Illegal Request");
} }

View File

@ -16,45 +16,33 @@
*/ */
package org.apache.nifi.distributed.cache.server.map; package org.apache.nifi.distributed.cache.server.map;
import java.nio.ByteBuffer;
public class MapPutResult { public class MapPutResult {
private final boolean successful; private final boolean successful;
private final ByteBuffer key, value; private final MapCacheRecord record;
private final ByteBuffer existingValue; private final MapCacheRecord existing;
private final ByteBuffer evictedKey, evictedValue; private final MapCacheRecord evicted;
public MapPutResult(final boolean successful, final ByteBuffer key, final ByteBuffer value, final ByteBuffer existingValue, final ByteBuffer evictedKey, final ByteBuffer evictedValue) { public MapPutResult(boolean successful, MapCacheRecord record, MapCacheRecord existing, MapCacheRecord evicted) {
this.successful = successful; this.successful = successful;
this.key = key; this.record = record;
this.value = value; this.existing = existing;
this.existingValue = existingValue; this.evicted = evicted;
this.evictedKey = evictedKey;
this.evictedValue = evictedValue;
} }
public boolean isSuccessful() { public boolean isSuccessful() {
return successful; return successful;
} }
public ByteBuffer getKey() { public MapCacheRecord getRecord() {
return key; return record;
} }
public ByteBuffer getValue() { public MapCacheRecord getExisting() {
return value; return existing;
} }
public ByteBuffer getExistingValue() { public MapCacheRecord getEvicted() {
return existingValue; return evicted;
}
public ByteBuffer getEvictedKey() {
return evictedKey;
}
public ByteBuffer getEvictedValue() {
return evictedValue;
} }
} }

View File

@ -57,38 +57,27 @@ public class PersistentMapCache implements MapCache {
@Override @Override
public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException { public MapPutResult putIfAbsent(final ByteBuffer key, final ByteBuffer value) throws IOException {
final MapPutResult putResult = wrapped.putIfAbsent(key, value); final MapPutResult putResult = wrapped.putIfAbsent(key, value);
if (putResult.isSuccessful()) { putWriteAheadLog(key, value, putResult);
// The put was successful.
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
final List<MapWaliRecord> records = new ArrayList<>();
records.add(record);
if (putResult.getEvictedKey() != null) {
records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue()));
}
wali.update(Collections.singletonList(record), false);
final long modCount = modifications.getAndIncrement();
if (modCount > 0 && modCount % 100000 == 0) {
wali.checkpoint();
}
}
return putResult; return putResult;
} }
@Override @Override
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException { public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
final MapPutResult putResult = wrapped.put(key, value); final MapPutResult putResult = wrapped.put(key, value);
putWriteAheadLog(key, value, putResult);
return putResult;
}
protected void putWriteAheadLog(ByteBuffer key, ByteBuffer value, MapPutResult putResult) throws IOException {
if ( putResult.isSuccessful() ) { if ( putResult.isSuccessful() ) {
// The put was successful. // The put was successful.
final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value); final MapWaliRecord record = new MapWaliRecord(UpdateType.CREATE, key, value);
final List<MapWaliRecord> records = new ArrayList<>(); final List<MapWaliRecord> records = new ArrayList<>();
records.add(record); records.add(record);
if ( putResult.getEvictedKey() != null ) { final MapCacheRecord evicted = putResult.getEvicted();
records.add(new MapWaliRecord(UpdateType.DELETE, putResult.getEvictedKey(), putResult.getEvictedValue())); if ( evicted != null ) {
records.add(new MapWaliRecord(UpdateType.DELETE, evicted.getKey(), evicted.getValue()));
} }
wali.update(Collections.singletonList(record), false); wali.update(Collections.singletonList(record), false);
@ -98,8 +87,6 @@ public class PersistentMapCache implements MapCache {
wali.checkpoint(); wali.checkpoint();
} }
} }
return putResult;
} }
@Override @Override
@ -112,6 +99,18 @@ public class PersistentMapCache implements MapCache {
return wrapped.get(key); return wrapped.get(key);
} }
@Override
public MapCacheRecord fetch(ByteBuffer key) throws IOException {
return wrapped.fetch(key);
}
@Override
public MapPutResult replace(MapCacheRecord record) throws IOException {
final MapPutResult putResult = wrapped.replace(record);
putWriteAheadLog(record.getKey(), record.getValue(), putResult);
return putResult;
}
@Override @Override
public ByteBuffer remove(final ByteBuffer key) throws IOException { public ByteBuffer remove(final ByteBuffer key) throws IOException {
final ByteBuffer removeResult = wrapped.remove(key); final ByteBuffer removeResult = wrapped.remove(key);

View File

@ -84,16 +84,7 @@ public class SimpleMapCache implements MapCache {
final MapCacheRecord record = cache.get(key); final MapCacheRecord record = cache.get(key);
if (record == null) { if (record == null) {
// Record is null. We will add. // Record is null. We will add.
final MapCacheRecord evicted = evict(); return put(key, value, record);
final MapCacheRecord newRecord = new MapCacheRecord(key, value);
cache.put(key, newRecord);
inverseCacheMap.put(newRecord, key);
if (evicted == null) {
return new MapPutResult(true, key, value, null, null, null);
} else {
return new MapPutResult(true, key, value, null, evicted.getKey(), evicted.getValue());
}
} }
// Record is not null. Increment hit count and return result indicating that record was not added. // Record is not null. Increment hit count and return result indicating that record was not added.
@ -101,29 +92,37 @@ public class SimpleMapCache implements MapCache {
record.hit(); record.hit();
inverseCacheMap.put(record, key); inverseCacheMap.put(record, key);
return new MapPutResult(false, key, value, record.getValue(), null, null); return new MapPutResult(false, record, record, null);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
} }
private MapPutResult put(final ByteBuffer key, final ByteBuffer value, final MapCacheRecord existing) {
// evict if we need to in order to make room for a new entry.
final MapCacheRecord evicted = evict();
final long revision;
if (existing == null) {
revision = 0;
} else {
revision = existing.getRevision() + 1;
inverseCacheMap.remove(existing);
}
final MapCacheRecord record = new MapCacheRecord(key, value, revision);
cache.put(key, record);
inverseCacheMap.put(record, key);
return new MapPutResult(true, record, existing, evicted);
}
@Override @Override
public MapPutResult put(final ByteBuffer key, final ByteBuffer value) { public MapPutResult put(final ByteBuffer key, final ByteBuffer value) throws IOException {
writeLock.lock(); writeLock.lock();
try { try {
// evict if we need to in order to make room for a new entry. final MapCacheRecord existing = cache.get(key);
final MapCacheRecord evicted = evict(); return put(key, value, existing);
final MapCacheRecord record = new MapCacheRecord(key, value);
final MapCacheRecord existing = cache.put(key, record);
inverseCacheMap.put(record, key);
final ByteBuffer existingValue = (existing == null) ? null : existing.getValue();
final ByteBuffer evictedKey = (evicted == null) ? null : evicted.getKey();
final ByteBuffer evictedValue = (evicted == null) ? null : evicted.getValue();
return new MapPutResult(true, key, value, existingValue, evictedKey, evictedValue);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }
@ -182,6 +181,43 @@ public class SimpleMapCache implements MapCache {
} }
} }
@Override
public MapCacheRecord fetch(ByteBuffer key) throws IOException {
readLock.lock();
try {
final MapCacheRecord record = cache.get(key);
if (record == null) {
return null;
}
inverseCacheMap.remove(record);
record.hit();
inverseCacheMap.put(record, key);
return record;
} finally {
readLock.unlock();
}
}
@Override
public MapPutResult replace(MapCacheRecord inputRecord) throws IOException {
writeLock.lock();
try {
final ByteBuffer key = inputRecord.getKey();
final ByteBuffer value = inputRecord.getValue();
final MapCacheRecord existing = fetch(key);
if (existing != null && inputRecord.getRevision() != existing.getRevision()) {
// The key has been updated by other operation.
return new MapPutResult(false, inputRecord, existing, null);
}
return put(key, value, existing);
} finally {
writeLock.unlock();
}
}
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
} }

View File

@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -33,14 +34,17 @@ import java.util.Map;
import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService;
import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService; import org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService;
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException; import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer; import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
import org.apache.nifi.distributed.cache.server.map.MapCacheServer;
import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceInitializationContext; import org.apache.nifi.util.MockControllerServiceInitializationContext;
@ -52,6 +56,8 @@ import org.mockito.Mockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
public class TestServerAndClient { public class TestServerAndClient {
private static Logger LOGGER; private static Logger LOGGER;
@ -450,6 +456,157 @@ public class TestServerAndClient {
server.shutdownServer(); server.shutdownServer();
} }
@Test
public void testOptimisticLock() throws Exception {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
final DistributedMapCacheServer server = new MapServer();
final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
runner.addControllerService("server", server);
runner.enableControllerService(server);
DistributedMapCacheClientService client1 = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client1, "client1");
client1.initialize(clientInitContext1);
DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
client1.initialize(clientInitContext2);
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
MockConfigurationContext clientContext1 = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
client1.cacheConfig(clientContext1);
MockConfigurationContext clientContext2 = new MockConfigurationContext(clientProperties, clientInitContext2.getControllerServiceLookup());
client2.cacheConfig(clientContext2);
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final String key = "test-optimistic-lock";
// Ensure there's no existing key
assertFalse(client1.containsKey(key, stringSerializer));
assertNull(client1.fetch(key, stringSerializer, stringDeserializer));
// Client 1 inserts the key.
client1.put(key, "valueC1-0", stringSerializer, stringSerializer);
// Client 1 and 2 fetch the key
AtomicDistributedMapCacheClient.CacheEntry<String, String> c1 = client1.fetch(key, stringSerializer, stringDeserializer);
AtomicDistributedMapCacheClient.CacheEntry<String, String> c2 = client2.fetch(key, stringSerializer, stringDeserializer);
assertEquals(0, c1.getRevision());
assertEquals("valueC1-0", c1.getValue());
assertEquals(0, c2.getRevision());
assertEquals("valueC1-0", c2.getValue());
// Client 1 replace
boolean c1Result = client1.replace(key, "valueC1-1", stringSerializer, stringSerializer, c1.getRevision());
assertTrue("C1 should be able to replace the key", c1Result);
// Client 2 replace with the old revision
boolean c2Result = client2.replace(key, "valueC2-1", stringSerializer, stringSerializer, c2.getRevision());
assertFalse("C2 shouldn't be able to replace the key", c2Result);
// Client 2 fetch the key again
c2 = client2.fetch(key, stringSerializer, stringDeserializer);
assertEquals("valueC1-1", c2.getValue());
assertEquals(1, c2.getRevision());
// Now, Client 2 knows the correct revision so it can replace the key
c2Result = client2.replace(key, "valueC2-2", stringSerializer, stringSerializer, c2.getRevision());
assertTrue("C2 should be able to replace the key", c2Result);
// Assert the cache
c2 = client2.fetch(key, stringSerializer, stringDeserializer);
assertEquals("valueC2-2", c2.getValue());
assertEquals(2, c2.getRevision());
client1.close();
client2.close();
server.shutdownServer();
}
@Test
public void testBackwardCompatibility() throws Exception {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
// Create a server that only supports protocol version 1.
final DistributedMapCacheServer server = new MapServer() {
@Override
protected MapCacheServer createMapCacheServer(int port, int maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) throws IOException {
return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, evictionPolicy, persistenceDir) {
@Override
protected StandardVersionNegotiator getVersionNegotiator() {
return new StandardVersionNegotiator(1);
}
};
}
};
runner.addControllerService("server", server);
runner.enableControllerService(server);
DistributedMapCacheClientService client = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext1 = new MockControllerServiceInitializationContext(client, "client");
client.initialize(clientInitContext1);
final Map<PropertyDescriptor, String> clientProperties = new HashMap<>();
clientProperties.put(DistributedMapCacheClientService.HOSTNAME, "localhost");
clientProperties.put(DistributedMapCacheClientService.PORT, String.valueOf(server.getPort()));
clientProperties.put(DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "360 secs");
MockConfigurationContext clientContext = new MockConfigurationContext(clientProperties, clientInitContext1.getControllerServiceLookup());
client.cacheConfig(clientContext);
final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer();
final String key = "test-backward-compatibility";
// Version 1 operations should work
client.put(key, "value1", stringSerializer, stringSerializer);
assertEquals("value1", client.get(key, stringSerializer, stringDeserializer));
assertTrue(client.containsKey(key, stringSerializer));
try {
client.fetch(key, stringSerializer, stringDeserializer);
fail("Version 2 operations should NOT work.");
} catch (UnsupportedOperationException e) {
}
try {
client.replace(key, "value2", stringSerializer, stringSerializer, 0L);
fail("Version 2 operations should NOT work.");
} catch (UnsupportedOperationException e) {
}
client.close();
server.shutdownServer();
}
private void waitABit() { private void waitABit() {
try { try {
Thread.sleep(10L); Thread.sleep(10L);

View File

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.distributed.cache.server.map;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class TestSimpleMapCache {
@Test
public void testBasicOperations() throws Exception {
final SimpleMapCache cache = new SimpleMapCache("service-id", 2, EvictionPolicy.FIFO);
final ByteBuffer key1 = ByteBuffer.wrap("key1".getBytes());
final ByteBuffer key2 = ByteBuffer.wrap("key2".getBytes());
final ByteBuffer key3 = ByteBuffer.wrap("key3".getBytes());
ByteBuffer value1 = ByteBuffer.wrap("value1-0".getBytes());
ByteBuffer value2 = ByteBuffer.wrap("value2-0".getBytes());
ByteBuffer value3 = ByteBuffer.wrap("value3-0".getBytes());
// Initial state.
assertNull(cache.get(key1));
assertNull(cache.fetch(key1));
// Put the 1st key.
MapPutResult putResult = cache.put(key1, value1);
assertTrue(putResult.isSuccessful());
assertNull(putResult.getExisting());
assertNull(putResult.getEvicted());
assertEquals(0, putResult.getRecord().getRevision());
// Update the same key.
value1 = ByteBuffer.wrap("value1-1".getBytes());
putResult = cache.put(key1, value1);
assertTrue(putResult.isSuccessful());
assertNotNull(putResult.getExisting());
assertEquals(1, putResult.getRecord().getRevision());
assertEquals(key1, putResult.getExisting().getKey());
assertEquals("value1-0", new String(putResult.getExisting().getValue().array()));
assertNull(putResult.getEvicted());
// Put the 2nd key.
putResult = cache.put(key2, value2);
assertTrue(putResult.isSuccessful());
assertNull(putResult.getExisting());
assertNull(putResult.getEvicted());
assertEquals(0, putResult.getRecord().getRevision());
// Put the 3rd key.
putResult = cache.put(key3, value3);
assertTrue(putResult.isSuccessful());
assertNull(putResult.getExisting());
assertNotNull("The first key should be evicted", putResult.getEvicted());
assertEquals("key1", new String(putResult.getEvicted().getKey().array()));
assertEquals("value1-1", new String(putResult.getEvicted().getValue().array()));
assertEquals(0, putResult.getRecord().getRevision());
// Delete 2nd key.
ByteBuffer removed = cache.remove(key2);
assertNotNull(removed);
assertEquals("value2-0", new String(removed.array()));
// Put the 2nd key again.
putResult = cache.put(key2, value2);
assertTrue(putResult.isSuccessful());
assertNull(putResult.getExisting());
assertNull(putResult.getEvicted());
assertEquals("Revision should start from 0", 0, putResult.getRecord().getRevision());
}
@Test
public void testOptimisticLock() throws Exception {
final SimpleMapCache cache = new SimpleMapCache("service-id", 2, EvictionPolicy.FIFO);
final ByteBuffer key = ByteBuffer.wrap("key1".getBytes());
ByteBuffer valueC1 = ByteBuffer.wrap("valueC1-0".getBytes());
ByteBuffer valueC2 = ByteBuffer.wrap("valueC2-0".getBytes());
assertNull("If there's no existing key, fetch should return null.", cache.fetch(key));
// Client 1 inserts the key.
MapCacheRecord c1 = new MapCacheRecord(key, valueC1);
MapPutResult putResult = cache.replace(c1);
assertTrue("Replace should succeed if there's no existing key.", putResult.isSuccessful());
MapCacheRecord c2 = new MapCacheRecord(key, valueC2);
putResult = cache.replace(c2);
assertFalse("Replace should fail.", putResult.isSuccessful());
// Client 1 and 2 fetch the key
c1 = cache.fetch(key);
c2 = cache.fetch(key);
assertEquals(0, c1.getRevision());
assertEquals(0, c2.getRevision());
// Client 1 replace
valueC1 = ByteBuffer.wrap("valueC1-1".getBytes());
putResult = cache.replace(new MapCacheRecord(key, valueC1, c1.getRevision()));
assertTrue("Replace should succeed since revision matched.", putResult.isSuccessful());
assertEquals(1, putResult.getRecord().getRevision());
// Client 2 replace with the old revision
valueC2 = ByteBuffer.wrap("valueC2-1".getBytes());
putResult = cache.replace(new MapCacheRecord(key, valueC2, c2.getRevision()));
assertFalse("Replace should fail.", putResult.isSuccessful());
}
}