NIFI-12473 Deleted removeByPattern Methods from Cache Services

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8124.
This commit is contained in:
exceptionfactory 2023-12-05 13:09:40 -06:00 committed by Pierre Villard
parent 58f1e97c41
commit 34aebc1f69
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
30 changed files with 57 additions and 496 deletions

View File

@ -70,9 +70,4 @@ public class MockDistributedMapCacheClient extends AbstractControllerService imp
public <K> boolean remove(K key, Serializer<K> serializer) { public <K> boolean remove(K key, Serializer<K> serializer) {
throw new NotImplementedException(); throw new NotImplementedException();
} }
@Override
public long removeByPattern(String regex) {
throw new NotImplementedException();
}
} }

View File

@ -223,11 +223,6 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl
return true; return true;
} }
@Override
public long removeByPattern(String s) throws IOException {
throw new UnsupportedOperationException();
}
private <K> byte[] serializeKey(K k, Serializer<K> keySerializer) throws IOException { private <K> byte[] serializeKey(K k, Serializer<K> keySerializer) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream(); ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(k, out); keySerializer.serialize(k, out);

View File

@ -24,10 +24,6 @@ import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.error.CASMismatchException; import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.DocumentAlreadyExistsException; import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import com.couchbase.client.java.error.DocumentDoesNotExistException; import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.query.Delete;
import com.couchbase.client.java.query.N1qlQuery;
import com.couchbase.client.java.query.N1qlQueryResult;
import com.couchbase.client.java.query.Statement;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.annotation.lifecycle.OnEnabled;
@ -38,27 +34,21 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static com.couchbase.client.java.query.dsl.functions.PatternMatchingFunctions.regexpContains;
import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME;
import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE;
// TODO: Doc
@Tags({"distributed", "cache", "map", "cluster", "couchbase"}) @Tags({"distributed", "cache", "map", "cluster", "couchbase"})
@CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." + @CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." +
" This can be used in order to share a Map between nodes in a NiFi cluster." + " This can be used in order to share a Map between nodes in a NiFi cluster." +
" Couchbase Server cluster can provide a high available and persistent cache storage.") " Couchbase Server cluster can provide a high available and persistent cache storage.")
public class CouchbaseMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> { public class CouchbaseMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
private static final Logger logger = LoggerFactory.getLogger(CouchbaseMapCacheClient.class);
private CouchbaseClusterControllerService clusterService; private CouchbaseClusterControllerService clusterService;
private Bucket bucket; private Bucket bucket;
@ -196,7 +186,7 @@ public class CouchbaseMapCacheClient extends AbstractControllerService implement
} }
@Override @Override
public void close() throws IOException { public void close() {
} }
@Override @Override
@ -208,14 +198,4 @@ public class CouchbaseMapCacheClient extends AbstractControllerService implement
return false; return false;
} }
} }
@Override
public long removeByPattern(String regex) throws IOException {
Statement statement = Delete.deleteFromCurrentBucket().where(regexpContains("meta().id", regex));
final N1qlQueryResult result = bucket.query(N1qlQuery.simple(statement));
if (logger.isDebugEnabled()) {
logger.debug("Deleted documents using regex {}, result={}", regex, result);
}
return result.info().mutationCount();
}
} }

View File

@ -29,12 +29,6 @@
Following cache operations require N1QL query, thus you need to deploy Couchbase Server 4.0 or higher for those operations. However, as of this writing (May 2017) there are only few processors using these operations. Most cache APIs are implemented using document id lookup and should work with older version of Couchbase Server. Following cache operations require N1QL query, thus you need to deploy Couchbase Server 4.0 or higher for those operations. However, as of this writing (May 2017) there are only few processors using these operations. Most cache APIs are implemented using document id lookup and should work with older version of Couchbase Server.
<ul>
<li>removeByPattern(String regex): This cache API removes entries by regex. Execute query like:
<pre>delete from `cache-bucket-name` where REGEX_CONTAINS(meta().id, "^key.*")</pre>
</li>
</ul>
In order to make N1QL work correctly you need to create a <a href="https://developer.couchbase.com/documentation/server/current/n1ql/n1ql-language-reference/createprimaryindex.html">Primary index</a> or an index covering N1QL queries performed by CouchbaseMapCacheClient. Please refer Couchbase Server documentations for how to create those. In order to make N1QL work correctly you need to create a <a href="https://developer.couchbase.com/documentation/server/current/n1ql/n1ql-language-reference/createprimaryindex.html">Primary index</a> or an index covering N1QL queries performed by CouchbaseMapCacheClient. Please refer Couchbase Server documentations for how to create those.
</body> </body>

View File

@ -66,8 +66,6 @@ import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -169,7 +167,7 @@ public class TestAbstractListProcessor {
final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
// Create a persistence file of the format anticipated // Create a persistence file of the format anticipated
try (FileOutputStream fos = new FileOutputStream(persistenceFile);) { try (FileOutputStream fos = new FileOutputStream(persistenceFile)) {
fos.write(serviceState.getBytes(StandardCharsets.UTF_8)); fos.write(serviceState.getBytes(StandardCharsets.UTF_8));
} }
@ -439,12 +437,12 @@ public class TestAbstractListProcessor {
private int fetchCount = 0; private int fetchCount = 0;
@Override @Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return false; return false;
} }
@Override @Override
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
return null; return null;
} }
@ -474,32 +472,16 @@ public class TestAbstractListProcessor {
final Object value = stored.remove(key); final Object value = stored.remove(key);
return value != null; return value != null;
} }
@Override
public long removeByPattern(String regex) throws IOException {
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : stored.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(stored.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(stored::remove);
return numRemoved;
}
} }
static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> { static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
final Map<String, ListableEntity> entities = new HashMap<>(); final Map<String, ListableEntity> entities = new HashMap<>();
final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID() + ".json";
String persistenceFolder = "target/"; String persistenceFolder = "target/";
File persistenceFile = new File(persistenceFolder + persistenceFilename); File persistenceFile = new File(persistenceFolder + persistenceFilename);
private static PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder() private static final PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder()
.name("reset-state") .name("reset-state")
.addValidator(Validator.VALID) .addValidator(Validator.VALID)
.build(); .build();
@ -590,7 +572,7 @@ public class TestAbstractListProcessor {
} }
@Override @Override
protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) throws IOException { protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) {
final PropertyValue listingFilter = context.getProperty(LISTING_FILTER); final PropertyValue listingFilter = context.getProperty(LISTING_FILTER);
Predicate<ListableEntity> filter = listingFilter.isSet() Predicate<ListableEntity> filter = listingFilter.isSet()
? entity -> entity.getName().matches(listingFilter.getValue()) ? entity -> entity.getName().matches(listingFilter.getValue())
@ -599,7 +581,7 @@ public class TestAbstractListProcessor {
} }
@Override @Override
protected Integer countUnfilteredListing(final ProcessContext context) throws IOException { protected Integer countUnfilteredListing(final ProcessContext context) {
return entities.size(); return entities.size();
} }

View File

@ -180,11 +180,6 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement
return cache.remove(getCacheEntryKey(key, keySerializer)); return cache.remove(getCacheEntryKey(key, keySerializer));
} }
@Override
public long removeByPattern(final String regex) throws IOException {
return cache.removeAll(new RegexPredicate(regex));
}
private static class RegexPredicate implements Predicate<String>, Serializable { private static class RegexPredicate implements Predicate<String>, Serializable {
private final Pattern pattern; private final Pattern pattern;

View File

@ -162,24 +162,6 @@ public class HazelcastMapCacheClientTest {
thenGetEntryEquals(VALUE); thenGetEntryEquals(VALUE);
} }
@Test
public void testRemoveByPattern() throws Exception {
// given
whenPutEntry("key1", "a");
whenPutEntry("key2", "b");
whenPutEntry("key3", "c");
whenPutEntry("other", "d");
// when
testSubject.removeByPattern("key.*");
// then
thenEntryIsNotInCache("key1");
thenEntryIsNotInCache("key2");
thenEntryIsNotInCache("key3");
thenEntryIsInCache("other");
}
@Test @Test
public void testWhenReplaceNonExistingAtomicEntry() throws Exception { public void testWhenReplaceNonExistingAtomicEntry() throws Exception {
// given // given

View File

@ -17,7 +17,6 @@
package org.apache.nifi.hazelcast.services.cachemanager; package org.apache.nifi.hazelcast.services.cachemanager;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hazelcast.services.DummyStringSerializer; import org.apache.nifi.hazelcast.services.DummyStringSerializer;
import org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient; import org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient;
@ -94,15 +93,10 @@ class TestHazelcastProcessor extends AbstractProcessor {
assertTrue(testSubject.containsKey(KEY_1, SERIALIZER)); assertTrue(testSubject.containsKey(KEY_1, SERIALIZER));
assertTrue(testSubject.containsKey(KEY_2, SERIALIZER)); assertTrue(testSubject.containsKey(KEY_2, SERIALIZER));
assertEquals(2, testSubject.removeByPattern("key.*"));
assertTrue(testSubject.replace(new AtomicCacheEntry<>(KEY_1, VALUE_1, 0L), SERIALIZER, SERIALIZER));
assertEquals(VALUE_1, testSubject.fetch(KEY_1, SERIALIZER, SERIALIZER).getValue());
session.transfer(flowFile, REL_SUCCESS); session.transfer(flowFile, REL_SUCCESS);
} catch (final AssertionError| IOException e) { } catch (final AssertionError | IOException e) {
session.transfer(flowFile, REL_FAILURE); session.transfer(flowFile, REL_FAILURE);
e.printStackTrace(); getLogger().error("Processing failed", e);
} }
} }
} }

View File

@ -45,8 +45,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
@ -140,7 +138,7 @@ public class TestGetHBase {
} }
@Test @Test
public void testPersistAndRecoverFromLocalState() throws InitializationException { public void testPersistAndRecoverFromLocalState() {
final File stateFile = new File("target/test-recover-state.bin"); final File stateFile = new File("target/test-recover-state.bin");
if (!stateFile.delete() && stateFile.exists()) { if (!stateFile.delete() && stateFile.exists()) {
fail("Could not delete state file " + stateFile); fail("Could not delete state file " + stateFile);
@ -178,7 +176,7 @@ public class TestGetHBase {
} }
@Test @Test
public void testBecomePrimaryWithNoLocalState() throws InitializationException { public void testBecomePrimaryWithNoLocalState() {
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
final Map<String, String> cells = new HashMap<>(); final Map<String, String> cells = new HashMap<>();
@ -491,7 +489,7 @@ public class TestGetHBase {
} }
@Override @Override
public void close() throws IOException { public void close() {
} }
@Override @Override
@ -500,23 +498,5 @@ public class TestGetHBase {
values.remove(key); values.remove(key);
return true; return true;
} }
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
} }
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}
} }

View File

@ -31,8 +31,6 @@ import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple; import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands; import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration; import org.springframework.data.redis.core.types.Expiration;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -192,7 +190,7 @@ public class SimpleRedisDistributedMapCacheClientService extends AbstractControl
} }
@Override @Override
public void close() throws IOException { public void close() {
// nothing to do // nothing to do
} }
@ -205,33 +203,6 @@ public class SimpleRedisDistributedMapCacheClientService extends AbstractControl
}); });
} }
@Override
public long removeByPattern(final String regex) throws IOException {
return withConnection(redisConnection -> {
long deletedCount = 0;
final List<byte[]> batchKeys = new ArrayList<>();
// delete keys in batches of 1000 using the cursor
final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
while (cursor.hasNext()) {
batchKeys.add(cursor.next());
if (batchKeys.size() == 1000) {
deletedCount += redisConnection.del(getKeys(batchKeys));
batchKeys.clear();
}
}
// delete any left-over keys if some were added to the batch but never reached 1000
if (batchKeys.size() > 0) {
deletedCount += redisConnection.del(getKeys(batchKeys));
batchKeys.clear();
}
return deletedCount;
});
}
/** /**
* Convert the list of all keys to an array. * Convert the list of all keys to an array.
*/ */

View File

@ -44,7 +44,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.springframework.lang.NonNull; import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -64,8 +63,10 @@ import java.util.function.Consumer;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL; import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_MODE_SENTINEL; import static org.apache.nifi.redis.util.RedisUtils.REDIS_MODE_SENTINEL;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/** /**
@ -363,7 +364,6 @@ public class ITRedisDistributedMapCacheClientService {
return; return;
} }
final ByteArrayOutputStream out = new ByteArrayOutputStream();
final Serializer<String> stringSerializer = new StringSerializer(); final Serializer<String> stringSerializer = new StringSerializer();
final Deserializer<String> stringDeserializer = new StringDeserializer(); final Deserializer<String> stringDeserializer = new StringDeserializer();
final Deserializer<String> stringDeserializerWithoutNullCheck = new StringDeserializerWithoutNullCheck(); final Deserializer<String> stringDeserializerWithoutNullCheck = new StringDeserializerWithoutNullCheck();
@ -386,7 +386,7 @@ public class ITRedisDistributedMapCacheClientService {
// verify get returns null for a key that doesn't exist // verify get returns null for a key that doesn't exist
final String missingValue = cacheClient.get("does-not-exist", stringSerializer, stringDeserializerWithoutNullCheck); final String missingValue = cacheClient.get("does-not-exist", stringSerializer, stringDeserializerWithoutNullCheck);
assertEquals(null, missingValue); assertNull(missingValue);
// verify remove removes the entry and contains key returns false after // verify remove removes the entry and contains key returns false after
assertTrue(cacheClient.remove(key, stringSerializer)); assertTrue(cacheClient.remove(key, stringSerializer));
@ -406,14 +406,14 @@ public class ITRedisDistributedMapCacheClientService {
final String keyThatDoesntExist = key + "_DOES_NOT_EXIST"; final String keyThatDoesntExist = key + "_DOES_NOT_EXIST";
assertFalse(cacheClient.containsKey(keyThatDoesntExist, stringSerializer)); assertFalse(cacheClient.containsKey(keyThatDoesntExist, stringSerializer));
final String getAndPutIfAbsentResultWhenDoesntExist = cacheClient.getAndPutIfAbsent(keyThatDoesntExist, value, stringSerializer, stringSerializer, stringDeserializer); final String getAndPutIfAbsentResultWhenDoesntExist = cacheClient.getAndPutIfAbsent(keyThatDoesntExist, value, stringSerializer, stringSerializer, stringDeserializer);
assertEquals(null, getAndPutIfAbsentResultWhenDoesntExist); assertNull(getAndPutIfAbsentResultWhenDoesntExist);
assertEquals(value, cacheClient.get(keyThatDoesntExist, stringSerializer, stringDeserializer)); assertEquals(value, cacheClient.get(keyThatDoesntExist, stringSerializer, stringDeserializer));
// verify atomic fetch returns the correct entry // verify atomic fetch returns the correct entry
final AtomicCacheEntry<String, String, byte[]> entry = cacheClient.fetch(key, stringSerializer, stringDeserializer); final AtomicCacheEntry<String, String, byte[]> entry = cacheClient.fetch(key, stringSerializer, stringDeserializer);
assertEquals(key, entry.getKey()); assertEquals(key, entry.getKey());
assertEquals(value, entry.getValue()); assertEquals(value, entry.getValue());
assertTrue(Arrays.equals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null))); assertArrayEquals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null));
final AtomicCacheEntry<String, String, byte[]> notLatestEntry = new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), "not previous".getBytes(StandardCharsets.UTF_8)); final AtomicCacheEntry<String, String, byte[]> notLatestEntry = new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), "not previous".getBytes(StandardCharsets.UTF_8));
@ -438,9 +438,6 @@ public class ITRedisDistributedMapCacheClientService {
cacheClient.put(key + "-" + i, value, stringSerializer, stringSerializer); cacheClient.put(key + "-" + i, value, stringSerializer, stringSerializer);
} }
assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete);
assertFalse(cacheClient.containsKey(key, stringSerializer));
Map<String, String> bulk = new HashMap<>(); Map<String, String> bulk = new HashMap<>();
bulk.put("bulk-1", "testing1"); bulk.put("bulk-1", "testing1");
bulk.put("bulk-2", "testing2"); bulk.put("bulk-2", "testing2");
@ -474,14 +471,14 @@ public class ITRedisDistributedMapCacheClientService {
private static class StringDeserializer implements Deserializer<String> { private static class StringDeserializer implements Deserializer<String> {
@Override @Override
public String deserialize(byte[] input) throws DeserializationException, IOException { public String deserialize(byte[] input) throws DeserializationException {
return input == null ? null : new String(input, StandardCharsets.UTF_8); return input == null ? null : new String(input, StandardCharsets.UTF_8);
} }
} }
private static class StringDeserializerWithoutNullCheck implements Deserializer<String> { private static class StringDeserializerWithoutNullCheck implements Deserializer<String> {
@Override @Override
public String deserialize(byte[] input) throws DeserializationException, IOException { public String deserialize(byte[] input) throws DeserializationException {
return new String(input, StandardCharsets.UTF_8); return new String(input, StandardCharsets.UTF_8);
} }
} }

View File

@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
final class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient { final class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient {
private Map<Object, Object> storage; private final Map<Object, Object> storage;
public MockCacheService() { public MockCacheService() {
storage = new HashMap<>(); storage = new HashMap<>();
@ -41,12 +41,12 @@ final class MockCacheService extends AbstractControllerService implements Distri
* @return true if the value was added to the cache, false if it already exists * @return true if the value was added to the cache, false if it already exists
*/ */
@Override @Override
public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
return storage.putIfAbsent(key, value) == null; return storage.putIfAbsent(key, value) == null;
} }
@Override @Override
public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) {
if (storage.containsKey(key)) { if (storage.containsKey(key)) {
return (V) storage.get(key); return (V) storage.get(key);
} else { } else {
@ -84,9 +84,4 @@ final class MockCacheService extends AbstractControllerService implements Distri
return false; return false;
} }
} }
@Override
public long removeByPattern(String regex) throws IOException {
return 0;
}
} }

View File

@ -177,10 +177,6 @@ public class TestDetectDuplicate {
public void close() throws IOException { public void close() throws IOException {
} }
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
}
@Override @Override
protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected java.util.List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
@ -192,7 +188,7 @@ public class TestDetectDuplicate {
} }
@Override @Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
if (exists) { if (exists) {
return false; return false;
} }
@ -205,7 +201,7 @@ public class TestDetectDuplicate {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException { final Deserializer<V> valueDeserializer) {
if (exists) { if (exists) {
return (V) cacheValue; return (V) cacheValue;
} }
@ -234,16 +230,6 @@ public class TestDetectDuplicate {
return true; return true;
} }
@Override
public long removeByPattern(String regex) throws IOException {
if (exists) {
exists = false;
return 1L;
} else {
return 0L;
}
}
@Override @Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
cacheValue = value; cacheValue = value;

View File

@ -28,14 +28,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TestFetchDistributedMapCache { public class TestFetchDistributedMapCache {
@ -207,7 +203,7 @@ public class TestFetchDistributedMapCache {
} }
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private static class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false; private boolean failOnCalls = false;
@ -250,7 +246,6 @@ public class TestFetchDistributedMapCache {
} }
@Override @Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail(); verifyNotFail();
if(values.containsKey(key)) { if(values.containsKey(key)) {
@ -261,7 +256,7 @@ public class TestFetchDistributedMapCache {
} }
@Override @Override
public void close() throws IOException { public void close() {
} }
@Override @Override
@ -270,24 +265,5 @@ public class TestFetchDistributedMapCache {
values.remove(key); values.remove(key);
return true; return true;
} }
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
} }
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}
} }

View File

@ -30,14 +30,10 @@ import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertNull;
@ -60,7 +56,7 @@ public class TestNotify {
} }
@Test @Test
public void testNotify() throws InitializationException, IOException { public void testNotify() throws IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
@ -82,7 +78,7 @@ public class TestNotify {
} }
@Test @Test
public void testNotifyCounters() throws InitializationException, IOException { public void testNotifyCounters() throws IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
@ -120,7 +116,7 @@ public class TestNotify {
} }
@Test @Test
public void testNotifyCountersBatch() throws InitializationException, IOException { public void testNotifyCountersBatch() throws IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
@ -174,7 +170,7 @@ public class TestNotify {
} }
@Test @Test
public void testNotifyCountersUsingDelta() throws InitializationException, IOException { public void testNotifyCountersUsingDelta() throws IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
@ -217,7 +213,7 @@ public class TestNotify {
} }
@Test @Test
public void testIllegalDelta() throws InitializationException, IOException { public void testIllegalDelta() throws IOException {
runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
@ -307,9 +303,7 @@ public class TestNotify {
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
props.put("releaseSignalAttribute", "2"); props.put("releaseSignalAttribute", "2");
runner.enqueue(new byte[] {}, props); runner.enqueue(new byte[] {}, props);
final AssertionError e = assertThrows(AssertionError.class, () -> { final AssertionError e = assertThrows(AssertionError.class, () -> runner.run());
runner.run();
});
assertTrue(e.getCause() instanceof RuntimeException); assertTrue(e.getCause() instanceof RuntimeException);
service.setFailOnCalls(false); service.setFailOnCalls(false);
@ -335,15 +329,14 @@ public class TestNotify {
} }
@Override @Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
unsupported(); unsupported();
return false; return false;
} }
@Override @Override
@SuppressWarnings("unchecked")
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException { final Deserializer<V> valueDeserializer) {
unsupported(); unsupported();
return null; return null;
} }
@ -360,7 +353,6 @@ public class TestNotify {
} }
@Override @Override
@SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
verifyNotFail(); verifyNotFail();
@ -383,23 +375,6 @@ public class TestNotify {
return values.remove(key) != null; return values.remove(key) != null;
} }
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
}
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { public <K, V> AtomicCacheEntry<K, V, Long> fetch(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException {

View File

@ -28,14 +28,10 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@ -69,7 +65,7 @@ public class TestPutDistributedMapCache {
} }
@Test @Test
public void testSingleFlowFile() throws InitializationException, IOException { public void testSingleFlowFile() throws IOException {
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
@ -93,7 +89,7 @@ public class TestPutDistributedMapCache {
} }
@Test @Test
public void testNothingToCache() throws InitializationException, IOException { public void testNothingToCache() {
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
final Map<String, String> props = new HashMap<>(); final Map<String, String> props = new HashMap<>();
@ -109,7 +105,7 @@ public class TestPutDistributedMapCache {
} }
@Test @Test
public void testMaxCacheEntrySize() throws InitializationException, IOException { public void testMaxCacheEntrySize() throws IOException {
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${uuid}"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${uuid}");
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "10 B"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "10 B");
@ -134,7 +130,7 @@ public class TestPutDistributedMapCache {
} }
@Test @Test
public void testCacheStrategyReplace() throws InitializationException, IOException { public void testCacheStrategyReplace() throws IOException {
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_REPLACE.getValue()); runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_REPLACE.getValue());
@ -178,7 +174,7 @@ public class TestPutDistributedMapCache {
} }
@Test @Test
public void testCacheStrategyKeepOriginal() throws InitializationException, IOException { public void testCacheStrategyKeepOriginal() throws IOException {
runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}");
runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_KEEP_ORIGINAL.getValue()); runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_KEEP_ORIGINAL.getValue());
@ -221,7 +217,7 @@ public class TestPutDistributedMapCache {
assertEquals(original, new String(value, "UTF-8")); assertEquals(original, new String(value, "UTF-8"));
} }
private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private static class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient {
private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>(); private final ConcurrentMap<Object, Object> values = new ConcurrentHashMap<>();
private boolean failOnCalls = false; private boolean failOnCalls = false;
@ -266,7 +262,7 @@ public class TestPutDistributedMapCache {
} }
@Override @Override
public void close() throws IOException { public void close() {
} }
@Override @Override
@ -275,24 +271,5 @@ public class TestPutDistributedMapCache {
values.remove(key); values.remove(key);
return true; return true;
} }
@Override
public long removeByPattern(String regex) throws IOException {
verifyNotFail();
final List<Object> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (Object key : values.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key.toString());
if (m.matches()) {
removedRecords.add(values.get(key));
} }
}
final long numRemoved = removedRecords.size();
removedRecords.forEach(values::remove);
return numRemoved;
}
}
} }

View File

@ -201,31 +201,6 @@ public interface DistributedMapCacheClient extends ControllerService {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
/**
* Removes entries whose keys match the specified pattern
*
* @param regex The regular expression / pattern on which to match the keys to be removed
* @return The number of entries that were removed
* @throws IOException if any error occurred while removing an entry
*/
long removeByPattern(String regex) throws IOException;
/**
* Removes entries whose keys match the specified pattern, and returns a map of entries that
* were removed.
*
* @param <K> type of key
* @param <V> type of value
* @param regex The regular expression / pattern on which to match the keys to be removed
* @param keyDeserializer key deserializer
* @param valueDeserializer value deserializer
* @return A map of key/value entries that were removed from the cache
* @throws IOException if any error occurred while removing an entry
*/
default <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) throws IOException {
throw new UnsupportedOperationException();
}
/** /**
* Returns a set of all keys currently in the cache * Returns a set of all keys currently in the cache
* *

View File

@ -26,7 +26,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.adapter.AtomicCacheEntryInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.AtomicCacheEntryInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.MapInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.MapValuesInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.MapValuesInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.SetInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.SetInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter;
@ -184,19 +183,6 @@ public class DistributedMapCacheClientService extends AbstractControllerService
return cacheClient.removeAndGet(bytesKey, inboundAdapter); return cacheClient.removeAndGet(bytesKey, inboundAdapter);
} }
@Override
public long removeByPattern(String regex) throws IOException {
return cacheClient.removeByPattern(regex);
}
@Override
public <K, V> Map<K, V> removeByPatternAndGet(String regex, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) throws IOException {
final MapInboundAdapter<K, V> inboundAdapter =
new MapInboundAdapter<>(keyDeserializer, valueDeserializer, new HashMap<>());
return cacheClient.removeByPatternAndGet(regex, inboundAdapter);
}
@Override @Override
public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer, public <K, V> AtomicCacheEntry<K, V, Long> fetch(final K key, final Serializer<K> keySerializer,
final Deserializer<V> valueDeserializer) throws IOException { final Deserializer<V> valueDeserializer) throws IOException {

View File

@ -18,8 +18,6 @@ package org.apache.nifi.distributed.cache.client;
import org.apache.nifi.distributed.cache.client.adapter.AtomicCacheEntryInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.AtomicCacheEntryInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.BooleanInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.BooleanInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.LongInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.MapInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.MapValuesInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.MapValuesInboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter;
import org.apache.nifi.distributed.cache.client.adapter.SetInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.SetInboundAdapter;
@ -223,41 +221,6 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient {
return valueAdapter.getResult(); return valueAdapter.getResult();
} }
/**
* Removes entries whose keys match the specified pattern.
*
* @param regex The regular expression / pattern on which to match the keys to be removed
* @return The number of entries that were removed
* @throws IOException if unable to communicate with the remote instance
*/
public long removeByPattern(String regex) throws IOException {
final OutboundAdapter outboundAdapter = new OutboundAdapter()
.write(MapOperation.REMOVE_BY_PATTERN.value())
.write(regex);
final LongInboundAdapter inboundAdapter = new LongInboundAdapter();
invoke(outboundAdapter, inboundAdapter);
return inboundAdapter.getResult();
}
/**
* Removes entries whose keys match the specified pattern, and returns a map of entries that
* were removed.
*
* @param <K> type of key
* @param <V> type of value
* @param regex The regular expression / pattern on which to match the keys to be removed
* @return A map of key/value entries that were removed from the cache
* @throws IOException if unable to communicate with the remote instance
*/
public <K, V> Map<K, V> removeByPatternAndGet(String regex, final MapInboundAdapter<K, V> mapAdapter) throws IOException {
final OutboundAdapter outboundAdapter = new OutboundAdapter()
.minimumVersion(ProtocolVersion.V3.value())
.write(MapOperation.REMOVE_BY_PATTERN_AND_GET.value())
.write(regex);
invoke(outboundAdapter, mapAdapter);
return mapAdapter.getResult();
}
/** /**
* Fetch a CacheEntry with a key. * Fetch a CacheEntry with a key.
* *

View File

@ -29,8 +29,6 @@ public enum MapOperation implements CacheOperation {
PUT_IF_ABSENT("putIfAbsent"), PUT_IF_ABSENT("putIfAbsent"),
REMOVE("remove"), REMOVE("remove"),
REMOVE_AND_GET("removeAndGet"), REMOVE_AND_GET("removeAndGet"),
REMOVE_BY_PATTERN("removeByPattern"),
REMOVE_BY_PATTERN_AND_GET("removeByPatternAndGet"),
REPLACE("replace"), REPLACE("replace"),
SUBMAP("subMap"), SUBMAP("subMap"),
CLOSE("close"); CLOSE("close");

View File

@ -38,7 +38,7 @@ public class ProtocolHandshake {
* If the server doesn't support requested protocol version, HandshakeException will be thrown.</p> * If the server doesn't support requested protocol version, HandshakeException will be thrown.</p>
* *
* <p>DistributedMapCache version histories:<ul> * <p>DistributedMapCache version histories:<ul>
* <li>3: Added subMap, keySet, removeAndGet, removeByPatternAndGet methods.</li> * <li>3: Added subMap, keySet, removeAndGet, methods.</li>
* <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li> * <li>2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.</li>
* <li>1: Initial version.</li> * <li>1: Initial version.</li>
* </ul></p> * </ul></p>

View File

@ -57,10 +57,6 @@ public class MapCacheRequestDecoder extends CacheRequestDecoder {
request = new MapCacheRequest(cacheOperation); request = new MapCacheRequest(cacheOperation);
} else if (MapOperation.REMOVE == cacheOperation) { } else if (MapOperation.REMOVE == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf); request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
request = readPatternRequest(cacheOperation, byteBuf);
} else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
request = readPatternRequest(cacheOperation, byteBuf);
} else if (MapOperation.REMOVE_AND_GET == cacheOperation) { } else if (MapOperation.REMOVE_AND_GET == cacheOperation) {
request = readKeyRequest(cacheOperation, byteBuf); request = readKeyRequest(cacheOperation, byteBuf);
} else if (MapOperation.REPLACE == cacheOperation) { } else if (MapOperation.REPLACE == cacheOperation) {
@ -116,12 +112,6 @@ public class MapCacheRequestDecoder extends CacheRequestDecoder {
return mapCacheRequest; return mapCacheRequest;
} }
private MapCacheRequest readPatternRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final Optional<String> pattern = readUnicodeString(byteBuf);
final Optional<MapCacheRequest> request = pattern.map(requestedPattern -> new MapCacheRequest(cacheOperation, requestedPattern));
return request.orElse(null);
}
private MapCacheRequest readSubMapRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) { private MapCacheRequest readSubMapRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) {
final MapCacheRequest mapCacheRequest; final MapCacheRequest mapCacheRequest;

View File

@ -26,14 +26,12 @@ import org.apache.nifi.distributed.cache.server.map.MapCacheRecord;
import org.apache.nifi.distributed.cache.server.map.MapPutResult; import org.apache.nifi.distributed.cache.server.map.MapPutResult;
import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult; import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult;
import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest; import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest;
import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse;
import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse; import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse;
import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse; import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -106,23 +104,6 @@ public class MapCacheRequestHandler extends SimpleChannelInboundHandler<MapCache
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey()); final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer removed = mapCache.remove(key); final ByteBuffer removed = mapCache.remove(key);
writeBytes(channelHandlerContext, cacheOperation, removed); writeBytes(channelHandlerContext, cacheOperation, removed);
} else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) {
final String pattern = mapCacheRequest.getPattern();
final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
final int size = removed == null ? 0 : removed.size();
writeRemoved(channelHandlerContext, cacheOperation, size);
} else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) {
final String pattern = mapCacheRequest.getPattern();
final Map<ByteBuffer, ByteBuffer> removed = mapCache.removeByPattern(pattern);
if (removed == null) {
writeRemoved(channelHandlerContext, cacheOperation, 0);
} else {
writeSize(channelHandlerContext, cacheOperation, removed.size());
for (final Map.Entry<ByteBuffer, ByteBuffer> entry : removed.entrySet()) {
writeBytes(channelHandlerContext, cacheOperation, entry.getKey());
writeBytes(channelHandlerContext, cacheOperation, entry.getValue());
}
}
} else if (MapOperation.REPLACE == cacheOperation) { } else if (MapOperation.REPLACE == cacheOperation) {
final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey()); final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey());
final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue()); final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue());
@ -147,12 +128,6 @@ public class MapCacheRequestHandler extends SimpleChannelInboundHandler<MapCache
channelHandlerContext.writeAndFlush(cacheOperationResult); channelHandlerContext.writeAndFlush(cacheOperationResult);
} }
private void writeRemoved(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final long size) {
final MapRemoveResponse mapRemoveResponse = new MapRemoveResponse(size);
log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);
channelHandlerContext.writeAndFlush(mapRemoveResponse);
}
private void writeSize(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final int size) { private void writeSize(final ChannelHandlerContext channelHandlerContext, final CacheOperation cacheOperation, final int size) {
final MapSizeResponse mapSizeResponse = new MapSizeResponse(size); final MapSizeResponse mapSizeResponse = new MapSizeResponse(size);
log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size); log.debug("Map Cache Operation [{}] Size [{}]", cacheOperation, size);

View File

@ -36,8 +36,6 @@ public interface MapCache {
ByteBuffer remove(ByteBuffer key) throws IOException; ByteBuffer remove(ByteBuffer key) throws IOException;
Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException;
MapCacheRecord fetch(ByteBuffer key) throws IOException; MapCacheRecord fetch(ByteBuffer key) throws IOException;
MapPutResult replace(MapCacheRecord record) throws IOException; MapPutResult replace(MapCacheRecord record) throws IOException;

View File

@ -154,25 +154,6 @@ public class PersistentMapCache implements MapCache {
return removeResult; return removeResult;
} }
@Override
public Map<ByteBuffer, ByteBuffer> removeByPattern(final String regex) throws IOException {
final Map<ByteBuffer, ByteBuffer> removeResult = wrapped.removeByPattern(regex);
if (removeResult != null) {
final List<MapWaliRecord> records = new ArrayList<>(removeResult.size());
for(Map.Entry<ByteBuffer, ByteBuffer> entry : removeResult.entrySet()) {
final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, entry.getKey(), entry.getValue());
records.add(record);
wali.update(records, false);
final long modCount = modifications.getAndIncrement();
if (modCount > 0 && modCount % 1000 == 0) {
wali.checkpoint();
}
}
}
return removeResult;
}
@Override @Override
public Set<ByteBuffer> keySet() throws IOException { public Set<ByteBuffer> keySet() throws IOException {
return wrapped.keySet(); return wrapped.keySet();
@ -255,7 +236,7 @@ public class PersistentMapCache implements MapCache {
@Override @Override
public MapWaliRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { public MapWaliRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
return deserializeEdit(in, new HashMap<Object, MapWaliRecord>(), version); return deserializeEdit(in, new HashMap<>(), version);
} }
@Override @Override
@ -281,7 +262,7 @@ public class PersistentMapCache implements MapCache {
private static class SerdeFactory implements SerDeFactory<MapWaliRecord> { private static class SerdeFactory implements SerDeFactory<MapWaliRecord> {
private Serde serde; private final Serde serde;
public SerdeFactory() { public SerdeFactory() {
this.serde = new Serde(); this.serde = new Serde();

View File

@ -19,7 +19,6 @@ package org.apache.nifi.distributed.cache.server.map;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -29,8 +28,6 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.nifi.distributed.cache.server.EvictionPolicy; import org.apache.nifi.distributed.cache.server.EvictionPolicy;
@ -172,7 +169,7 @@ public class SimpleMapCache implements MapCache {
} }
@Override @Override
public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) throws IOException { public Map<ByteBuffer, ByteBuffer> subMap(List<ByteBuffer> keys) {
if (keys == null) { if (keys == null) {
return null; return null;
} }
@ -212,33 +209,7 @@ public class SimpleMapCache implements MapCache {
} }
@Override @Override
public Map<ByteBuffer, ByteBuffer> removeByPattern(String regex) throws IOException { public MapCacheRecord fetch(ByteBuffer key) {
writeLock.lock();
try {
final Map<ByteBuffer, ByteBuffer> removedMap = new HashMap<>();
final List<MapCacheRecord> removedRecords = new ArrayList<>();
Pattern p = Pattern.compile(regex);
for (ByteBuffer key : cache.keySet()) {
// Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset
Matcher m = p.matcher(new String(key.array()));
if (m.matches()) {
removedRecords.add(cache.get(key));
}
}
removedRecords.forEach((record) -> {
cache.remove(record.getKey());
inverseCacheMap.remove(record);
removedMap.put(record.getKey(), record.getValue());
});
return removedMap;
} finally {
writeLock.unlock();
}
}
@Override
public MapCacheRecord fetch(ByteBuffer key) throws IOException {
readLock.lock(); readLock.lock();
try { try {
final MapCacheRecord record = cache.get(key); final MapCacheRecord record = cache.get(key);
@ -257,7 +228,7 @@ public class SimpleMapCache implements MapCache {
} }
@Override @Override
public MapPutResult replace(MapCacheRecord inputRecord) throws IOException { public MapPutResult replace(MapCacheRecord inputRecord) {
writeLock.lock(); writeLock.lock();
try { try {
final ByteBuffer key = inputRecord.getKey(); final ByteBuffer key = inputRecord.getKey();
@ -275,7 +246,7 @@ public class SimpleMapCache implements MapCache {
} }
@Override @Override
public Set<ByteBuffer> keySet() throws IOException { public Set<ByteBuffer> keySet() {
readLock.lock(); readLock.lock();
try { try {
return cache.keySet(); return cache.keySet();

View File

@ -131,16 +131,6 @@ public class DistributedMapCacheTest {
} }
} }
@Test
public void testRemoveByPattern() throws IOException {
final String[] keys = {"keyRemoveByPattern1", "keyRemoveByPattern2", "keyRemoveByPattern3"};
final String value = "valueRemoveByPattern";
for (String key : keys) {
client.put(key, value, serializer, serializer);
}
assertEquals(3, client.removeByPattern("keyRemoveByPattern\\d{1}"));
}
@Test @Test
public void testSubMap() throws IOException { public void testSubMap() throws IOException {
final String key = "keySubMap"; final String key = "keySubMap";
@ -156,21 +146,6 @@ public class DistributedMapCacheTest {
} }
} }
@Test
public void testRemoveByPatternAndGet() throws IOException {
final String key = "keyRemoveByPatternAndGet";
final String value = "valueRemoveByPatternAndGet";
for (int i = 0; (i < 3); ++i) {
client.put(key + i, value + i, serializer, serializer);
}
final Map<String, String> map = client.removeByPatternAndGet(
"keyRemoveByPatternAndGet\\d{1}", deserializer, deserializer);
assertEquals(3, map.size());
for (int i = 0; (i < 3); ++i) {
assertEquals(value + i, map.get(key + i));
}
}
@Test @Test
public void testReplaceFetchAtomic() throws IOException { public void testReplaceFetchAtomic() throws IOException {
final String key = "keyReplaceAtomic"; final String key = "keyReplaceAtomic";

View File

@ -137,19 +137,6 @@ public class TestDistributedMapServerAndClient {
final Set<String> keys = client.keySet(deserializer); final Set<String> keys = client.keySet(deserializer);
assertEquals(0, keys.size()); assertEquals(0, keys.size());
// Test removeByPattern, the first two should be removed and the last should remain
client.put("test.1", "1", keySerializer, keySerializer);
client.put("test.2", "2", keySerializer, keySerializer);
client.put("test3", "2", keySerializer, keySerializer);
final long removedTwo = client.removeByPattern("test\\..*");
assertEquals(2L, removedTwo);
assertFalse(client.containsKey("test.1", keySerializer));
assertFalse(client.containsKey("test.2", keySerializer));
assertTrue(client.containsKey("test3", keySerializer));
final boolean containedAfterRemove = client.containsKey("testKey", keySerializer);
assertFalse(containedAfterRemove);
client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); client.putIfAbsent("testKey", "test", keySerializer, valueSerializer);
runner.disableControllerService(client); runner.disableControllerService(client);
@ -280,7 +267,6 @@ public class TestDistributedMapServerAndClient {
assertThrows(UnsupportedOperationException.class, () -> client.keySet(stringDeserializer)); assertThrows(UnsupportedOperationException.class, () -> client.keySet(stringDeserializer));
assertThrows(UnsupportedOperationException.class, () -> client.removeAndGet("v.*", stringSerializer, stringDeserializer)); assertThrows(UnsupportedOperationException.class, () -> client.removeAndGet("v.*", stringSerializer, stringDeserializer));
assertThrows(UnsupportedOperationException.class, () ->client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer));
} finally { } finally {
client.close(); client.close();
} }

View File

@ -252,11 +252,6 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp
return contains; return contains;
} }
@Override
public long removeByPattern(String regex) throws IOException {
throw new IOException("HBase removeByPattern is not implemented");
}
@Override @Override
public void close() throws IOException { public void close() throws IOException {
} }

View File

@ -29,7 +29,6 @@ import org.apache.nifi.util.TestRunners;
import org.hamcrest.MatcherAssert; import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -77,7 +76,7 @@ public class TestDistributedMapCacheLookupService {
} }
@Override @Override
public void close() throws IOException { public void close() {
} }
@Override @Override
@ -90,39 +89,34 @@ public class TestDistributedMapCacheLookupService {
} }
@Override @Override
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
throw new UnsupportedOperationException("not implemented"); throw new UnsupportedOperationException("not implemented");
} }
@Override @Override
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException { final Deserializer<V> valueDeserializer) {
throw new UnsupportedOperationException("not implemented"); throw new UnsupportedOperationException("not implemented");
} }
@Override @Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) {
throw new UnsupportedOperationException("not implemented"); throw new UnsupportedOperationException("not implemented");
} }
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException { public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) {
return (V) map.get(key); return (V) map.get(key);
} }
@Override @Override
public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException { public <K> boolean remove(final K key, final Serializer<K> serializer) {
throw new UnsupportedOperationException("not implemented"); throw new UnsupportedOperationException("not implemented");
} }
@Override @Override
public long removeByPattern(String regex) throws IOException { public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
throw new UnsupportedOperationException("not implemented");
}
@Override
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
throw new UnsupportedOperationException("not implemented"); throw new UnsupportedOperationException("not implemented");
} }
} }