NIFI-4504, NIFI-4505 added removeAndGet, removeByPatternAndGet, and keySet methods to MapCache API

cleaned up some warnings on deprecated nifi.stream.io classes

This closes #2284.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mike Moser 2017-11-15 21:54:46 +00:00 committed by Koji Kawamura
parent 2fbe922a2b
commit c59a967623
13 changed files with 246 additions and 8 deletions

View File

@ -244,7 +244,12 @@ public class TestDetectDuplicate {
@Override @Override
public long removeByPattern(String regex) throws IOException { public long removeByPattern(String regex) throws IOException {
return exists ? 1L : 0L; if (exists) {
exists = false;
return 1L;
} else {
return 0L;
}
} }
@Override @Override

View File

@ -166,6 +166,23 @@ public interface DistributedMapCacheClient extends ControllerService {
*/ */
<K> boolean remove(K key, Serializer<K> serializer) throws IOException; <K> boolean remove(K key, Serializer<K> serializer) throws IOException;
/**
* Removes the entry with the given key from the cache, if it is present,
* and returns the value that was removed from the map.
*
* @param <K> type of key
* @param <V> type of value
* @param key key
* @param keySerializer key serializer
* @param valueDeserializer value deserializer
* @return the value previously associated with the key, or null if there was no mapping
* null can also indicate that the map previously associated null with the key
* @throws IOException ex
*/
default <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
throw new UnsupportedOperationException();
}
/** /**
* Removes entries whose keys match the specified pattern * Removes entries whose keys match the specified pattern
* *
@ -174,4 +191,32 @@ public interface DistributedMapCacheClient extends ControllerService {
* @throws IOException if any error occurred while removing an entry * @throws IOException if any error occurred while removing an entry
*/ */
long removeByPattern(String regex) throws IOException; long removeByPattern(String regex) throws IOException;
/**
* Removes entries whose keys match the specified pattern, and returns a map of entries that
* were removed.
*
* @param <K> type of key
* @param <V> type of value
* @param regex The regular expression / pattern on which to match the keys to be removed
* @param keyDeserializer key deserializer
* @param valueDeserializer value deserializer
* @return A map of key/value entries that were removed from the cache
* @throws IOException if any error occurred while removing an entry
*/
default <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws IOException {
throw new UnsupportedOperationException();
}
/**
* Returns a set of all keys currently in the cache
*
* @param <K> type of key
* @param keyDeserializer key deserializer
* @return a Set of all keys currently in the cache
* @throws IOException ex
*/
default <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException {
throw new UnsupportedOperationException();
}
} }

View File

@ -22,6 +22,7 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -255,6 +256,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}); });
} }
@Override
public <K, V> V removeAndGet(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(new CommsAction<V>() {
@Override
public V execute(final CommsSession session) throws IOException {
validateProtocolVersion(session, 3);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("removeAndGet");
serialize(key, keySerializer, dos);
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
final byte[] responseBuffer = readLengthDelimitedResponse(dis);
return valueDeserializer.deserialize(responseBuffer);
}
});
}
@Override @Override
public long removeByPattern(String regex) throws IOException { public long removeByPattern(String regex) throws IOException {
return withCommsSession(session -> { return withCommsSession(session -> {
@ -269,6 +291,34 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}); });
} }
@Override
public <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(new CommsAction<Map<K, V>>() {
@Override
public Map<K, V> execute(CommsSession session) throws IOException {
validateProtocolVersion(session, 3);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("removeByPatternAndGet");
dos.writeUTF(regex);
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
final int mapSize = dis.readInt();
HashMap<K, V> resultMap = new HashMap<>(mapSize);
for (int i=0; i<mapSize; i++) {
final byte[] keyBuffer = readLengthDelimitedResponse(dis);
K key = keyDeserializer.deserialize(keyBuffer);
final byte[] valueBuffer = readLengthDelimitedResponse(dis);
V value = valueDeserializer.deserialize(valueBuffer);
resultMap.put(key, value);
}
return resultMap;
}
});
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {
@ -321,6 +371,27 @@ public class DistributedMapCacheClientService extends AbstractControllerService
}); });
} }
@Override
public <K> Set<K> keySet(Deserializer<K> keyDeserializer) throws IOException {
return withCommsSession(session -> {
validateProtocolVersion(session, 3);
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("keySet");
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
final int setSize = dis.readInt();
HashSet<K> resultSet = new HashSet<>(setSize);
for (int i=0; i<setSize; i++) {
final byte[] responseBuffer = readLengthDelimitedResponse(dis);
resultSet.add(keyDeserializer.deserialize(responseBuffer));
}
return resultSet;
});
}
private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException { private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
final int responseLength = dis.readInt(); final int responseLength = dis.readInt();
final byte[] responseBuffer = new byte[responseLength]; final byte[] responseBuffer = new byte[responseLength];

View File

@ -16,7 +16,9 @@
*/ */
package org.apache.nifi.distributed.cache.client; package org.apache.nifi.distributed.cache.client;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -40,8 +42,6 @@ import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator; import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.ssl.SSLContextService.ClientAuth; import org.apache.nifi.ssl.SSLContextService.ClientAuth;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.distributed.cache.client; package org.apache.nifi.distributed.cache.client;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -25,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream; import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.nifi.distributed.cache.client; package org.apache.nifi.distributed.cache.client;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -25,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.remote.io.InterruptableInputStream; import org.apache.nifi.remote.io.InterruptableInputStream;
import org.apache.nifi.remote.io.InterruptableOutputStream; import org.apache.nifi.remote.io.InterruptableOutputStream;
import org.apache.nifi.remote.io.socket.SocketChannelInputStream; import org.apache.nifi.remote.io.socket.SocketChannelInputStream;

View File

@ -38,6 +38,7 @@ public class ProtocolHandshake {
* If the server doesn't support requested protocol version, HandshakeException will be thrown.</p> * If the server doesn't support requested protocol version, HandshakeException will be thrown.</p>
* *
* <p>DistributedMapCache version histories:<ul> * <p>DistributedMapCache version histories:<ul>
* <li>3: Added subMap, keySet, removeAndGet, removeByPatternAndGet methods.</li>
* <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li> * <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li>
* <li>1: Initial version.</li> * <li>1: Initial version.</li>
* </ul></p> * </ul></p>

View File

@ -17,6 +17,7 @@
package org.apache.nifi.distributed.cache.server; package org.apache.nifi.distributed.cache.server;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -29,7 +30,6 @@ import org.apache.nifi.distributed.cache.server.set.PersistentSetCache;
import org.apache.nifi.distributed.cache.server.set.SetCache; import org.apache.nifi.distributed.cache.server.set.SetCache;
import org.apache.nifi.distributed.cache.server.set.SetCacheResult; import org.apache.nifi.distributed.cache.server.set.SetCacheResult;
import org.apache.nifi.distributed.cache.server.set.SimpleSetCache; import org.apache.nifi.distributed.cache.server.set.SimpleSetCache;
import org.apache.nifi.stream.io.DataOutputStream;
public class SetCacheServer extends AbstractCacheServer { public class SetCacheServer extends AbstractCacheServer {

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
public interface MapCache { public interface MapCache {
@ -41,5 +42,7 @@ public interface MapCache {
MapPutResult replace(MapCacheRecord record) throws IOException; MapPutResult replace(MapCacheRecord record) throws IOException;
Set<ByteBuffer> keySet() throws IOException;
void shutdown() throws IOException; void shutdown() throws IOException;
} }

View File

@ -24,6 +24,7 @@ import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map; import java.util.Map;
import java.util.Set;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
@ -144,12 +145,47 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeBoolean(removed); dos.writeBoolean(removed);
break; break;
} }
case "removeAndGet": {
final byte[] key = readValue(dis);
final ByteBuffer removed = cache.remove(ByteBuffer.wrap(key));
if (removed == null) {
// there was no value removed
dos.writeInt(0);
} else {
// reply with the value that was removed
final byte[] byteArray = removed.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
case "removeByPattern": { case "removeByPattern": {
final String pattern = dis.readUTF(); final String pattern = dis.readUTF();
final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern); final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
dos.writeLong(removed == null ? 0 : removed.size()); dos.writeLong(removed == null ? 0 : removed.size());
break; break;
} }
case "removeByPatternAndGet": {
final String pattern = dis.readUTF();
final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
if (removed == null || removed.size() == 0) {
dos.writeLong(0);
} else {
// write the map size
dos.writeInt(removed.size());
for (Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
// write map entry key
final byte[] key = entry.getKey().array();
dos.writeInt(key.length);
dos.write(key);
// write map entry value
final byte[] value = entry.getValue().array();
dos.writeInt(value.length);
dos.write(value);
}
}
break;
}
case "fetch": { case "fetch": {
final byte[] key = readValue(dis); final byte[] key = readValue(dis);
final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key)); final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));
@ -175,6 +211,18 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeBoolean(result.isSuccessful()); dos.writeBoolean(result.isSuccessful());
break; break;
} }
case "keySet": {
final Set<ByteBuffer> result = cache.keySet();
// write the set size
dos.writeInt(result.size());
// write each key in the set
for (ByteBuffer bb : result) {
final byte[] byteArray = bb.array();
dos.writeInt(byteArray.length);
dos.write(byteArray);
}
break;
}
default: { default: {
throw new IOException("Illegal Request"); throw new IOException("Illegal Request");
} }

View File

@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -170,6 +171,11 @@ public class PersistentMapCache implements MapCache {
return removeResult; return removeResult;
} }
@Override
public Set<ByteBuffer> keySet() throws IOException {
return wrapped.keySet();
}
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
wali.shutdown(); wali.shutdown();

View File

@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -273,6 +274,16 @@ public class SimpleMapCache implements MapCache {
} }
} }
@Override
public Set<ByteBuffer> keySet() throws IOException {
readLock.lock();
try {
return cache.keySet();
} finally {
readLock.unlock();
}
}
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
} }

View File

@ -30,6 +30,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.SystemUtils;
@ -274,6 +275,13 @@ public class TestServerAndClient {
assertTrue(contains); assertTrue(contains);
assertTrue(contains2); assertTrue(contains2);
final Deserializer<String> deserializer = new StringDeserializer();
final Set<String> keys = client.keySet(deserializer);
assertEquals(3, keys.size());
assertTrue(keys.contains("test"));
assertTrue(keys.contains("test2"));
assertTrue(keys.contains("test3"));
final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer); final boolean addedAgain = client.putIfAbsent("test", "1", serializer, serializer);
assertFalse(addedAgain); assertFalse(addedAgain);
@ -307,6 +315,19 @@ public class TestServerAndClient {
assertFalse(client.containsKey("test.2", serializer)); assertFalse(client.containsKey("test.2", serializer));
assertTrue(client.containsKey("test3", serializer)); assertTrue(client.containsKey("test3", serializer));
// test removeByPatternAndGet
client.put("test.1", "1", serializer, serializer);
client.put("test.2", "2", serializer, serializer);
Map<String,String> removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
assertEquals(2, removed.size());
assertTrue(removed.containsKey("test.1"));
assertTrue(removed.containsKey("test.2"));
assertFalse(client.containsKey("test.1", serializer));
assertFalse(client.containsKey("test.2", serializer));
assertTrue(client.containsKey("test3", serializer));
removed = client.removeByPatternAndGet("test\\..*", deserializer, deserializer);
assertEquals(0, removed.size());
newServer.shutdownServer(); newServer.shutdownServer();
client.close(); client.close();
} }
@ -437,6 +458,16 @@ public class TestServerAndClient {
assertTrue(removed); assertTrue(removed);
LOGGER.debug("end remove"); LOGGER.debug("end remove");
client.put("testKey", "testValue", keySerializer, valueSerializer);
assertTrue(client.containsKey("testKey", keySerializer));
String removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
assertEquals("testValue", removedValue);
removedValue = client.removeAndGet("testKey", keySerializer, deserializer);
assertNull(removedValue);
final Set<String> keys = client.keySet(deserializer);
assertEquals(0, keys.size());
// Test removeByPattern, the first two should be removed and the last should remain // Test removeByPattern, the first two should be removed and the last should remain
client.put("test.1", "1", keySerializer, keySerializer); client.put("test.1", "1", keySerializer, keySerializer);
client.put("test.2", "2", keySerializer, keySerializer); client.put("test.2", "2", keySerializer, keySerializer);
@ -687,6 +718,23 @@ public class TestServerAndClient {
} catch (UnsupportedOperationException e) { } catch (UnsupportedOperationException e) {
} }
try {
Set<String> keys = client.keySet(stringDeserializer);
fail("Version 3 operations should NOT work.");
} catch (UnsupportedOperationException e) {
}
try {
String removed = client.removeAndGet("v.*", stringSerializer, stringDeserializer);
fail("Version 3 operations should NOT work.");
} catch (UnsupportedOperationException e) {
}
try {
Map<String, String> removed = client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer);
fail("Version 3 operations should NOT work.");
} catch (UnsupportedOperationException e) {
}
client.close(); client.close();
server.shutdownServer(); server.shutdownServer();
} }