NIFI-3627: Added removeByPattern() to DistributedMapCache interfaces

NIFI-3627: Updated unit tests that use MapCache interface(s)

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #1609.
This commit is contained in:
Matt Burgess 2017-03-21 18:15:54 -04:00 committed by Pierre Villard
parent 0b73715562
commit d1ebddce98
14 changed files with 226 additions and 62 deletions

View File

@ -43,6 +43,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -51,6 +52,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@ -485,5 +488,22 @@ public class TestListHDFS {
values.remove(key);
return true;
}
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}
}

View File

@ -33,6 +33,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.state.Scope;
@ -493,6 +495,23 @@ public class TestGetHBase {
values.remove(key);
return true;
}
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}
}

View File

@ -29,6 +29,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.Charsets;
import org.apache.nifi.components.PropertyDescriptor;
@ -445,6 +447,22 @@ public class TestAbstractListProcessor {
final Object value = stored.remove(key);
return value != null;
}
@Override
public long removeByPattern(String regex) throws IOException {
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : stored.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(stored.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(stored::remove);
return numRemoved;
}
}

View File

@ -242,6 +242,11 @@ public class TestDetectDuplicate {
return true;
}
@Override
public long removeByPattern(String regex) throws IOException {
return exists ? 1L : 0L;
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
cacheValue = value;

View File

@ -28,10 +28,14 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TestFetchDistributedMapCache {
@ -210,6 +214,23 @@ public class TestFetchDistributedMapCache {
values.remove(key);
return true;
}
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}

View File

@ -30,10 +30,14 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -374,6 +378,23 @@ public class TestNotify {
return values.remove(key) != null;
}
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
@Override
@SuppressWarnings("unchecked")
public <K, V> CacheEntry<K, V> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {

View File

@ -19,10 +19,14 @@ package org.apache.nifi.processors.standard;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.Deserializer;
@ -271,6 +275,23 @@ public class TestPutDistributedMapCache {
values.remove(key);
return true;
}
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}

View File

@ -137,4 +137,12 @@ public interface DistributedMapCacheClient extends ControllerService {
*/
<K> boolean remove(K key, Serializer<K> serializer) throws IOException;
/**
* Removes entries whose keys match the specified pattern
*
* @param regex The regular expression / pattern on which to match the keys to be removed
* @return The number of entries that were removed
* @throws IOException if any error occurred while removing an entry
*/
long removeByPattern(String regex) throws IOException;
}

View File

@ -216,6 +216,20 @@ public class DistributedMapCacheClientService extends AbstractControllerService
});
}
@Override
public long removeByPattern(String regex) throws IOException {
return withCommsSession(session -> {
final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
dos.writeUTF("removeByPattern");
dos.writeUTF(regex);
dos.flush();
// read response
final DataInputStream dis = new DataInputStream(session.getInputStream());
return dis.readLong();
});
}
@Override
public <K, V> CacheEntry<K, V> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withCommsSession(session -> {

View File

@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server.map;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
public interface MapCache {
@ -31,6 +32,8 @@ public interface MapCache {
ByteBuffer remove(ByteBuffer key) throws IOException;
Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException;
MapCacheRecord fetch(ByteBuffer key) throws IOException;
MapPutResult replace(MapCacheRecord record) throws IOException;

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import javax.net.ssl.SSLContext;
@ -126,6 +127,12 @@ public class MapCacheServer extends AbstractCacheServer {
dos.writeBoolean(removed);
break;
}
case "removeByPattern": {
final String pattern = dis.readUTF();
final Map<ByteBuffer, ByteBuffer> removed = cache.removeByPattern(pattern);
dos.writeLong(removed == null ? 0 : removed.size());
break;
}
case "fetch": {
final byte[] key = readValue(dis);
final MapCacheRecord existing = cache.fetch(ByteBuffer.wrap(key));

View File

@ -127,6 +127,25 @@ public class PersistentMapCache implements MapCache {
return removeResult;
}
@Override
public Map<ByteBuffer, ByteBuffer> removeByPattern(final String regex) throws IOException {
final Map<ByteBuffer, ByteBuffer> removeResult = wrapped.removeByPattern(regex);
if (removeResult != null) {
final List<MapWaliRecord> records = new ArrayList<>(removeResult.size());
for(Map.Entry<ByteBuffer, ByteBuffer> entry : removeResult.entrySet()) {
final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, entry.getKey(), entry.getValue());
records.add(record);
wali.update(records, false);
final long modCount = modifications.getAndIncrement();
if (modCount > 0 && modCount % 1000 == 0) {
wali.checkpoint();
}
}
}
return removeResult;
}
@Override
public void shutdown() throws IOException {
wali.shutdown();

View File

@ -19,13 +19,17 @@ package org.apache.nifi.distributed.cache.server.map;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.distributed.cache.server.EvictionPolicy;
@ -181,6 +185,32 @@ public class SimpleMapCache implements MapCache {
}
}
@Override
public Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException {
writeLock.lock();
try {
final Map<ByteBuffer, ByteBuffer> removedMap = new HashMap<>();
final List<MapCacheRecord> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (ByteBuffer key : cache.keySet()) {
// Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset
Matcher m = p.matcher(new String(key.array()));
if (m.matches()) {
removedRecords.add(cache.get(key));
}
}
removedRecords.forEach((record) -> {
cache.remove(record.getKey());
inverseCacheMap.remove(record);
removedMap.put(record.getKey(), record.getValue());
});
return removedMap;
} finally {
writeLock.unlock();
}
}
@Override
public MapCacheRecord fetch(ByteBuffer key) throws IOException {
readLock.lock();

View File

@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
@ -50,7 +49,6 @@ import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.MockControllerServiceInitializationContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
@ -75,13 +73,6 @@ public class TestServerAndClient {
@Test
public void testNonPersistentSetServerAndClient() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));
@ -111,12 +102,6 @@ public class TestServerAndClient {
@Test
public void testPersistentSetServerAndClient() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
@ -169,12 +154,6 @@ public class TestServerAndClient {
@Test
public void testPersistentSetServerAndClientWithLFUEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
@ -237,12 +216,6 @@ public class TestServerAndClient {
@Test
public void testPersistentMapServerAndClientWithLFUEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
@ -298,18 +271,22 @@ public class TestServerAndClient {
assertFalse(client.containsKey("test3", serializer));
assertTrue(client.containsKey("test4", serializer));
// Test removeByPattern, the first two should be removed and the last should remain
client.put("test.1", "1", serializer, serializer);
client.put("test.2", "2", serializer, serializer);
client.put("test3", "2", serializer, serializer);
final long removedTwo = client.removeByPattern("test\\..*");
assertEquals(2L, removedTwo);
assertFalse(client.containsKey("test.1", serializer));
assertFalse(client.containsKey("test.2", serializer));
assertTrue(client.containsKey("test3", serializer));
newServer.shutdownServer();
client.close();
}
@Test
public void testPersistentSetServerAndClientWithFIFOEvictions() throws InitializationException, IOException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
@ -379,12 +356,6 @@ public class TestServerAndClient {
@Test
public void testNonPersistentMapServerAndClient() throws InitializationException, IOException, InterruptedException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
@ -428,6 +399,16 @@ public class TestServerAndClient {
assertTrue(removed);
LOGGER.debug("end remove");
// Test removeByPattern, the first two should be removed and the last should remain
client.put("test.1", "1", keySerializer, keySerializer);
client.put("test.2", "2", keySerializer, keySerializer);
client.put("test3", "2", keySerializer, keySerializer);
final long removedTwo = client.removeByPattern("test\\..*");
assertEquals(2L, removedTwo);
assertFalse(client.containsKey("test.1", keySerializer));
assertFalse(client.containsKey("test.2", keySerializer));
assertTrue(client.containsKey("test3", keySerializer));
final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
assertFalse(containedAfterRemove);
@ -439,9 +420,6 @@ public class TestServerAndClient {
} catch (final Exception e) {
}
client = null;
clientInitContext = null;
clientContext = null;
DistributedMapCacheClientService client2 = new DistributedMapCacheClientService();
MockControllerServiceInitializationContext clientInitContext2 = new MockControllerServiceInitializationContext(client2, "client2");
@ -468,12 +446,6 @@ public class TestServerAndClient {
@Test
public void testClientTermination() throws InitializationException, IOException, InterruptedException {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("testClientTermination is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
final DistributedMapCacheServer server = new MapServer();
@ -526,13 +498,6 @@ public class TestServerAndClient {
@Test
public void testOptimisticLock() throws Exception {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
// Create server
@ -608,13 +573,6 @@ public class TestServerAndClient {
@Test
public void testBackwardCompatibility() throws Exception {
/**
* This bypasses the test for build environments in OS X running Java 1.8 due to a JVM bug
* See: https://issues.apache.org/jira/browse/NIFI-437
*/
Assume.assumeFalse("test is skipped due to build environment being OS X with JDK 1.8. See https://issues.apache.org/jira/browse/NIFI-437",
SystemUtils.IS_OS_MAC && SystemUtils.IS_JAVA_1_8);
LOGGER.info("Testing " + Thread.currentThread().getStackTrace()[1].getMethodName());
final TestRunner runner = TestRunners.newTestRunner(Mockito.mock(Processor.class));