diff --git a/nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/mocks/MockDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/mocks/MockDistributedMapCacheClient.java index 7a07e2e22d..6c71d59f56 100644 --- a/nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/mocks/MockDistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-asana-bundle/nifi-asana-processors/src/test/java/org/apache/nifi/processors/asana/mocks/MockDistributedMapCacheClient.java @@ -70,9 +70,4 @@ public class MockDistributedMapCacheClient extends AbstractControllerService imp public boolean remove(K key, Serializer serializer) { throw new NotImplementedException(); } - - @Override - public long removeByPattern(String regex) { - throw new NotImplementedException(); - } } diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java index 892e5009a8..6c1bd8e947 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-distributedmapcache-service/src/main/java/org/apache/nifi/controller/cassandra/CassandraDistributedMapCache.java @@ -223,11 +223,6 @@ public class CassandraDistributedMapCache extends AbstractControllerService impl return true; } - @Override - public long removeByPattern(String s) throws IOException { - throw new UnsupportedOperationException(); - } - private byte[] serializeKey(K k, Serializer keySerializer) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); keySerializer.serialize(k, out); diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java index f2b7cd7c20..4b4ae3859d 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseMapCacheClient.java @@ -24,10 +24,6 @@ import com.couchbase.client.java.document.Document; import com.couchbase.client.java.error.CASMismatchException; import com.couchbase.client.java.error.DocumentAlreadyExistsException; import com.couchbase.client.java.error.DocumentDoesNotExistException; -import com.couchbase.client.java.query.Delete; -import com.couchbase.client.java.query.N1qlQuery; -import com.couchbase.client.java.query.N1qlQueryResult; -import com.couchbase.client.java.query.Statement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -38,27 +34,21 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import static com.couchbase.client.java.query.dsl.functions.PatternMatchingFunctions.regexpContains; import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.BUCKET_NAME; import static org.apache.nifi.couchbase.CouchbaseConfigurationProperties.COUCHBASE_CLUSTER_SERVICE; -// TODO: Doc @Tags({"distributed", "cache", "map", "cluster", "couchbase"}) @CapabilityDescription("Provides the ability to communicate with a Couchbase Server cluster as a DistributedMapCacheServer." + " This can be used in order to share a Map between nodes in a NiFi cluster." + " Couchbase Server cluster can provide a high available and persistent cache storage.") public class CouchbaseMapCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient { - private static final Logger logger = LoggerFactory.getLogger(CouchbaseMapCacheClient.class); - private CouchbaseClusterControllerService clusterService; private Bucket bucket; @@ -196,7 +186,7 @@ public class CouchbaseMapCacheClient extends AbstractControllerService implement } @Override - public void close() throws IOException { + public void close() { } @Override @@ -208,14 +198,4 @@ public class CouchbaseMapCacheClient extends AbstractControllerService implement return false; } } - - @Override - public long removeByPattern(String regex) throws IOException { - Statement statement = Delete.deleteFromCurrentBucket().where(regexpContains("meta().id", regex)); - final N1qlQueryResult result = bucket.query(N1qlQuery.simple(statement)); - if (logger.isDebugEnabled()) { - logger.debug("Deleted documents using regex {}, result={}", regex, result); - } - return result.info().mutationCount(); - } } diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html index d8c5011aec..ab837aaaf3 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/resources/docs/org.apache.nifi.couchbase.CouchbaseMapCacheClient/additionalDetails.html @@ -29,12 +29,6 @@ Following cache operations require N1QL query, thus you need to deploy Couchbase Server 4.0 or higher for those operations. However, as of this writing (May 2017) there are only few processors using these operations. Most cache APIs are implemented using document id lookup and should work with older version of Couchbase Server. -
    -
  • removeByPattern(String regex): This cache API removes entries by regex. Execute query like: -
    delete from `cache-bucket-name` where REGEX_CONTAINS(meta().id, "^key.*")
    -
  • -
- In order to make N1QL work correctly you need to create a Primary index or an index covering N1QL queries performed by CouchbaseMapCacheClient. Please refer Couchbase Server documentations for how to create those. diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index 31bcac98a5..afc970ab0a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-listed-entity/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -66,8 +66,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -169,7 +167,7 @@ public class TestAbstractListProcessor { final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; // Create a persistence file of the format anticipated - try (FileOutputStream fos = new FileOutputStream(persistenceFile);) { + try (FileOutputStream fos = new FileOutputStream(persistenceFile)) { fos.write(serviceState.getBytes(StandardCharsets.UTF_8)); } @@ -439,12 +437,12 @@ public class TestAbstractListProcessor { private int fetchCount = 0; @Override - public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) { return false; } @Override - public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { + public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) { return null; } @@ -474,32 +472,16 @@ public class TestAbstractListProcessor { final Object value = stored.remove(key); return value != null; } - - @Override - public long removeByPattern(String regex) throws IOException { - final List removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (Object key : stored.keySet()) { - // Key must be backed by something that can be converted into a String - Matcher m = p.matcher(key.toString()); - if (m.matches()) { - removedRecords.add(stored.get(key)); - } - } - final long numRemoved = removedRecords.size(); - removedRecords.forEach(stored::remove); - return numRemoved; - } } static class ConcreteListProcessor extends AbstractListProcessor { final Map entities = new HashMap<>(); - final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; + final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID() + ".json"; String persistenceFolder = "target/"; File persistenceFile = new File(persistenceFolder + persistenceFilename); - private static PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder() + private static final PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder() .name("reset-state") .addValidator(Validator.VALID) .build(); @@ -590,7 +572,7 @@ public class TestAbstractListProcessor { } @Override - protected List performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) throws IOException { + protected List performListing(final ProcessContext context, final Long minTimestamp, ListingMode listingMode) { final PropertyValue listingFilter = context.getProperty(LISTING_FILTER); Predicate filter = listingFilter.isSet() ? entity -> entity.getName().matches(listingFilter.getValue()) @@ -599,7 +581,7 @@ public class TestAbstractListProcessor { } @Override - protected Integer countUnfilteredListing(final ProcessContext context) throws IOException { + protected Integer countUnfilteredListing(final ProcessContext context) { return entities.size(); } diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java index ce01135d7b..0d4eb4f952 100644 --- a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java +++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/main/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClient.java @@ -180,11 +180,6 @@ public class HazelcastMapCacheClient extends AbstractControllerService implement return cache.remove(getCacheEntryKey(key, keySerializer)); } - @Override - public long removeByPattern(final String regex) throws IOException { - return cache.removeAll(new RegexPredicate(regex)); - } - private static class RegexPredicate implements Predicate, Serializable { private final Pattern pattern; diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java index a63241e825..9f47d10079 100644 --- a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java +++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cacheclient/HazelcastMapCacheClientTest.java @@ -162,24 +162,6 @@ public class HazelcastMapCacheClientTest { thenGetEntryEquals(VALUE); } - @Test - public void testRemoveByPattern() throws Exception { - // given - whenPutEntry("key1", "a"); - whenPutEntry("key2", "b"); - whenPutEntry("key3", "c"); - whenPutEntry("other", "d"); - - // when - testSubject.removeByPattern("key.*"); - - // then - thenEntryIsNotInCache("key1"); - thenEntryIsNotInCache("key2"); - thenEntryIsNotInCache("key3"); - thenEntryIsInCache("other"); - } - @Test public void testWhenReplaceNonExistingAtomicEntry() throws Exception { // given diff --git a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java index d08decac6d..76a1474fa9 100644 --- a/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java +++ b/nifi-nar-bundles/nifi-hazelcast-bundle/nifi-hazelcast-services/src/test/java/org/apache/nifi/hazelcast/services/cachemanager/TestHazelcastProcessor.java @@ -17,7 +17,6 @@ package org.apache.nifi.hazelcast.services.cachemanager; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.hazelcast.services.DummyStringSerializer; import org.apache.nifi.hazelcast.services.cacheclient.HazelcastMapCacheClient; @@ -94,15 +93,10 @@ class TestHazelcastProcessor extends AbstractProcessor { assertTrue(testSubject.containsKey(KEY_1, SERIALIZER)); assertTrue(testSubject.containsKey(KEY_2, SERIALIZER)); - assertEquals(2, testSubject.removeByPattern("key.*")); - - assertTrue(testSubject.replace(new AtomicCacheEntry<>(KEY_1, VALUE_1, 0L), SERIALIZER, SERIALIZER)); - assertEquals(VALUE_1, testSubject.fetch(KEY_1, SERIALIZER, SERIALIZER).getValue()); - session.transfer(flowFile, REL_SUCCESS); - } catch (final AssertionError| IOException e) { + } catch (final AssertionError | IOException e) { session.transfer(flowFile, REL_FAILURE); - e.printStackTrace(); + getLogger().error("Processing failed", e); } } } diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java index 3254a65451..5245d142d7 100644 --- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java +++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestGetHBase.java @@ -45,8 +45,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -140,7 +138,7 @@ public class TestGetHBase { } @Test - public void testPersistAndRecoverFromLocalState() throws InitializationException { + public void testPersistAndRecoverFromLocalState() { final File stateFile = new File("target/test-recover-state.bin"); if (!stateFile.delete() && stateFile.exists()) { fail("Could not delete state file " + stateFile); @@ -178,7 +176,7 @@ public class TestGetHBase { } @Test - public void testBecomePrimaryWithNoLocalState() throws InitializationException { + public void testBecomePrimaryWithNoLocalState() { final long now = System.currentTimeMillis(); final Map cells = new HashMap<>(); @@ -491,7 +489,7 @@ public class TestGetHBase { } @Override - public void close() throws IOException { + public void close() { } @Override @@ -500,23 +498,5 @@ public class TestGetHBase { values.remove(key); return true; } - - @Override - public long removeByPattern(String regex) throws IOException { - verifyNotFail(); - final List removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (Object key : values.keySet()) { - // Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset - Matcher m = p.matcher(key.toString()); - if (m.matches()) { - removedRecords.add(values.get(key)); - } - } - final long numRemoved = removedRecords.size(); - removedRecords.forEach(values::remove); - return numRemoved; - } } - } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java index 33f9340523..488ba8733e 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java @@ -31,8 +31,6 @@ import org.apache.nifi.redis.util.RedisAction; import org.apache.nifi.util.Tuple; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisStringCommands; -import org.springframework.data.redis.core.Cursor; -import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.types.Expiration; import java.io.ByteArrayOutputStream; @@ -192,7 +190,7 @@ public class SimpleRedisDistributedMapCacheClientService extends AbstractControl } @Override - public void close() throws IOException { + public void close() { // nothing to do } @@ -205,33 +203,6 @@ public class SimpleRedisDistributedMapCacheClientService extends AbstractControl }); } - @Override - public long removeByPattern(final String regex) throws IOException { - return withConnection(redisConnection -> { - long deletedCount = 0; - final List batchKeys = new ArrayList<>(); - - // delete keys in batches of 1000 using the cursor - final Cursor cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build()); - while (cursor.hasNext()) { - batchKeys.add(cursor.next()); - - if (batchKeys.size() == 1000) { - deletedCount += redisConnection.del(getKeys(batchKeys)); - batchKeys.clear(); - } - } - - // delete any left-over keys if some were added to the batch but never reached 1000 - if (batchKeys.size() > 0) { - deletedCount += redisConnection.del(getKeys(batchKeys)); - batchKeys.clear(); - } - - return deletedCount; - }); - } - /** * Convert the list of all keys to an array. */ diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java index bc3961b7b1..51a40ce855 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/ITRedisDistributedMapCacheClientService.java @@ -44,7 +44,6 @@ import org.junit.jupiter.api.io.TempDir; import org.springframework.lang.NonNull; import org.springframework.lang.Nullable; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetSocketAddress; @@ -64,8 +63,10 @@ import java.util.function.Consumer; import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL; import static org.apache.nifi.redis.util.RedisUtils.REDIS_MODE_SENTINEL; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -363,7 +364,6 @@ public class ITRedisDistributedMapCacheClientService { return; } - final ByteArrayOutputStream out = new ByteArrayOutputStream(); final Serializer stringSerializer = new StringSerializer(); final Deserializer stringDeserializer = new StringDeserializer(); final Deserializer stringDeserializerWithoutNullCheck = new StringDeserializerWithoutNullCheck(); @@ -386,7 +386,7 @@ public class ITRedisDistributedMapCacheClientService { // verify get returns null for a key that doesn't exist final String missingValue = cacheClient.get("does-not-exist", stringSerializer, stringDeserializerWithoutNullCheck); - assertEquals(null, missingValue); + assertNull(missingValue); // verify remove removes the entry and contains key returns false after assertTrue(cacheClient.remove(key, stringSerializer)); @@ -406,14 +406,14 @@ public class ITRedisDistributedMapCacheClientService { final String keyThatDoesntExist = key + "_DOES_NOT_EXIST"; assertFalse(cacheClient.containsKey(keyThatDoesntExist, stringSerializer)); final String getAndPutIfAbsentResultWhenDoesntExist = cacheClient.getAndPutIfAbsent(keyThatDoesntExist, value, stringSerializer, stringSerializer, stringDeserializer); - assertEquals(null, getAndPutIfAbsentResultWhenDoesntExist); + assertNull(getAndPutIfAbsentResultWhenDoesntExist); assertEquals(value, cacheClient.get(keyThatDoesntExist, stringSerializer, stringDeserializer)); // verify atomic fetch returns the correct entry final AtomicCacheEntry entry = cacheClient.fetch(key, stringSerializer, stringDeserializer); assertEquals(key, entry.getKey()); assertEquals(value, entry.getValue()); - assertTrue(Arrays.equals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null))); + assertArrayEquals(value.getBytes(StandardCharsets.UTF_8), entry.getRevision().orElse(null)); final AtomicCacheEntry notLatestEntry = new AtomicCacheEntry<>(entry.getKey(), entry.getValue(), "not previous".getBytes(StandardCharsets.UTF_8)); @@ -438,9 +438,6 @@ public class ITRedisDistributedMapCacheClientService { cacheClient.put(key + "-" + i, value, stringSerializer, stringSerializer); } - assertTrue(cacheClient.removeByPattern("test-redis-processor-*") >= numToDelete); - assertFalse(cacheClient.containsKey(key, stringSerializer)); - Map bulk = new HashMap<>(); bulk.put("bulk-1", "testing1"); bulk.put("bulk-2", "testing2"); @@ -474,14 +471,14 @@ public class ITRedisDistributedMapCacheClientService { private static class StringDeserializer implements Deserializer { @Override - public String deserialize(byte[] input) throws DeserializationException, IOException { + public String deserialize(byte[] input) throws DeserializationException { return input == null ? null : new String(input, StandardCharsets.UTF_8); } } private static class StringDeserializerWithoutNullCheck implements Deserializer { @Override - public String deserialize(byte[] input) throws DeserializationException, IOException { + public String deserialize(byte[] input) throws DeserializationException { return new String(input, StandardCharsets.UTF_8); } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java index f6428b4ffa..fe91724bc2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/MockCacheService.java @@ -26,7 +26,7 @@ import java.util.HashMap; import java.util.Map; final class MockCacheService extends AbstractControllerService implements DistributedMapCacheClient { - private Map storage; + private final Map storage; public MockCacheService() { storage = new HashMap<>(); @@ -41,12 +41,12 @@ final class MockCacheService extends AbstractControllerService implements Distri * @return true if the value was added to the cache, false if it already exists */ @Override - public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) throws IOException { + public boolean putIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer) { return storage.putIfAbsent(key, value) == null; } @Override - public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) throws IOException { + public V getAndPutIfAbsent(K key, V value, Serializer keySerializer, Serializer valueSerializer, Deserializer valueDeserializer) { if (storage.containsKey(key)) { return (V) storage.get(key); } else { @@ -84,9 +84,4 @@ final class MockCacheService extends AbstractControllerService implements Distri return false; } } - - @Override - public long removeByPattern(String regex) throws IOException { - return 0; - } } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java index 3eb7ace2bc..f6b2c412ba 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDetectDuplicate.java @@ -177,10 +177,6 @@ public class TestDetectDuplicate { public void close() throws IOException { } - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - } - @Override protected java.util.List getSupportedPropertyDescriptors() { final List props = new ArrayList<>(); @@ -192,7 +188,7 @@ public class TestDetectDuplicate { } @Override - public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) { if (exists) { return false; } @@ -205,7 +201,7 @@ public class TestDetectDuplicate { @Override @SuppressWarnings("unchecked") public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, - final Deserializer valueDeserializer) throws IOException { + final Deserializer valueDeserializer) { if (exists) { return (V) cacheValue; } @@ -234,16 +230,6 @@ public class TestDetectDuplicate { return true; } - @Override - public long removeByPattern(String regex) throws IOException { - if (exists) { - exists = false; - return 1L; - } else { - return 0L; - } - } - @Override public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { cacheValue = value; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java index 686d913754..047c53f786 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchDistributedMapCache.java @@ -28,14 +28,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; public class TestFetchDistributedMapCache { @@ -207,7 +203,7 @@ public class TestFetchDistributedMapCache { } - private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private static class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private final ConcurrentMap values = new ConcurrentHashMap<>(); private boolean failOnCalls = false; @@ -250,7 +246,6 @@ public class TestFetchDistributedMapCache { } @Override - @SuppressWarnings("unchecked") public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { verifyNotFail(); if(values.containsKey(key)) { @@ -261,7 +256,7 @@ public class TestFetchDistributedMapCache { } @Override - public void close() throws IOException { + public void close() { } @Override @@ -270,24 +265,5 @@ public class TestFetchDistributedMapCache { values.remove(key); return true; } - - @Override - public long removeByPattern(String regex) throws IOException { - verifyNotFail(); - final List removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (Object key : values.keySet()) { - // Key must be backed by something that can be converted into a String - Matcher m = p.matcher(key.toString()); - if (m.matches()) { - removedRecords.add(values.get(key)); - } - } - final long numRemoved = removedRecords.size(); - removedRecords.forEach(values::remove); - return numRemoved; - } } - - } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java index a4613b1206..ef03a6005c 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestNotify.java @@ -30,14 +30,10 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -60,7 +56,7 @@ public class TestNotify { } @Test - public void testNotify() throws InitializationException, IOException { + public void testNotify() throws IOException { runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); @@ -82,7 +78,7 @@ public class TestNotify { } @Test - public void testNotifyCounters() throws InitializationException, IOException { + public void testNotifyCounters() throws IOException { runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); @@ -120,7 +116,7 @@ public class TestNotify { } @Test - public void testNotifyCountersBatch() throws InitializationException, IOException { + public void testNotifyCountersBatch() throws IOException { runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); @@ -174,7 +170,7 @@ public class TestNotify { } @Test - public void testNotifyCountersUsingDelta() throws InitializationException, IOException { + public void testNotifyCountersUsingDelta() throws IOException { runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); @@ -217,7 +213,7 @@ public class TestNotify { } @Test - public void testIllegalDelta() throws InitializationException, IOException { + public void testIllegalDelta() throws IOException { runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}"); runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*"); runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}"); @@ -307,9 +303,7 @@ public class TestNotify { final Map props = new HashMap<>(); props.put("releaseSignalAttribute", "2"); runner.enqueue(new byte[] {}, props); - final AssertionError e = assertThrows(AssertionError.class, () -> { - runner.run(); - }); + final AssertionError e = assertThrows(AssertionError.class, () -> runner.run()); assertTrue(e.getCause() instanceof RuntimeException); service.setFailOnCalls(false); @@ -335,15 +329,14 @@ public class TestNotify { } @Override - public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) { unsupported(); return false; } @Override - @SuppressWarnings("unchecked") public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, - final Deserializer valueDeserializer) throws IOException { + final Deserializer valueDeserializer) { unsupported(); return null; } @@ -360,7 +353,6 @@ public class TestNotify { } @Override - @SuppressWarnings("unchecked") public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { verifyNotFail(); @@ -383,23 +375,6 @@ public class TestNotify { return values.remove(key) != null; } - @Override - public long removeByPattern(String regex) throws IOException { - verifyNotFail(); - final List removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (Object key : values.keySet()) { - // Key must be backed by something that can be converted into a String - Matcher m = p.matcher(key.toString()); - if (m.matches()) { - removedRecords.add(values.get(key)); - } - } - final long numRemoved = removedRecords.size(); - removedRecords.forEach(values::remove); - return numRemoved; - } - @Override @SuppressWarnings("unchecked") public AtomicCacheEntry fetch(K key, Serializer keySerializer, Deserializer valueDeserializer) throws IOException { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java index 04c2e18799..5eb1fba5d9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestPutDistributedMapCache.java @@ -28,14 +28,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -69,7 +65,7 @@ public class TestPutDistributedMapCache { } @Test - public void testSingleFlowFile() throws InitializationException, IOException { + public void testSingleFlowFile() throws IOException { runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); final Map props = new HashMap<>(); @@ -93,7 +89,7 @@ public class TestPutDistributedMapCache { } @Test - public void testNothingToCache() throws InitializationException, IOException { + public void testNothingToCache() { runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); final Map props = new HashMap<>(); @@ -109,7 +105,7 @@ public class TestPutDistributedMapCache { } @Test - public void testMaxCacheEntrySize() throws InitializationException, IOException { + public void testMaxCacheEntrySize() throws IOException { runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${uuid}"); runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_MAX_BYTES, "10 B"); @@ -134,7 +130,7 @@ public class TestPutDistributedMapCache { } @Test - public void testCacheStrategyReplace() throws InitializationException, IOException { + public void testCacheStrategyReplace() throws IOException { runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_REPLACE.getValue()); @@ -178,7 +174,7 @@ public class TestPutDistributedMapCache { } @Test - public void testCacheStrategyKeepOriginal() throws InitializationException, IOException { + public void testCacheStrategyKeepOriginal() throws IOException { runner.setProperty(PutDistributedMapCache.CACHE_ENTRY_IDENTIFIER, "${cacheKeyAttribute}"); runner.setProperty(PutDistributedMapCache.CACHE_UPDATE_STRATEGY, PutDistributedMapCache.CACHE_UPDATE_KEEP_ORIGINAL.getValue()); @@ -221,7 +217,7 @@ public class TestPutDistributedMapCache { assertEquals(original, new String(value, "UTF-8")); } - private class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { + private static class MockCacheClient extends AbstractControllerService implements DistributedMapCacheClient { private final ConcurrentMap values = new ConcurrentHashMap<>(); private boolean failOnCalls = false; @@ -266,7 +262,7 @@ public class TestPutDistributedMapCache { } @Override - public void close() throws IOException { + public void close() { } @Override @@ -275,24 +271,5 @@ public class TestPutDistributedMapCache { values.remove(key); return true; } - - @Override - public long removeByPattern(String regex) throws IOException { - verifyNotFail(); - final List removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (Object key : values.keySet()) { - // Key must be backed by something that can be converted into a String - Matcher m = p.matcher(key.toString()); - if (m.matches()) { - removedRecords.add(values.get(key)); - } - } - final long numRemoved = removedRecords.size(); - removedRecords.forEach(values::remove); - return numRemoved; - } } - - } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java index d018f651e2..44ce971769 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-client-service-api/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClient.java @@ -201,31 +201,6 @@ public interface DistributedMapCacheClient extends ControllerService { throw new UnsupportedOperationException(); } - /** - * Removes entries whose keys match the specified pattern - * - * @param regex The regular expression / pattern on which to match the keys to be removed - * @return The number of entries that were removed - * @throws IOException if any error occurred while removing an entry - */ - long removeByPattern(String regex) throws IOException; - - /** - * Removes entries whose keys match the specified pattern, and returns a map of entries that - * were removed. - * - * @param type of key - * @param type of value - * @param regex The regular expression / pattern on which to match the keys to be removed - * @param keyDeserializer key deserializer - * @param valueDeserializer value deserializer - * @return A map of key/value entries that were removed from the cache - * @throws IOException if any error occurred while removing an entry - */ - default Map removeByPatternAndGet(String regex, Deserializer keyDeserializer, Deserializer valueDeserializer) throws IOException { - throw new UnsupportedOperationException(); - } - /** * Returns a set of all keys currently in the cache * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java index 775b3932e4..f92d4634f5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java @@ -26,7 +26,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.distributed.cache.client.adapter.AtomicCacheEntryInboundAdapter; -import org.apache.nifi.distributed.cache.client.adapter.MapInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.MapValuesInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.SetInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.ValueInboundAdapter; @@ -184,19 +183,6 @@ public class DistributedMapCacheClientService extends AbstractControllerService return cacheClient.removeAndGet(bytesKey, inboundAdapter); } - @Override - public long removeByPattern(String regex) throws IOException { - return cacheClient.removeByPattern(regex); - } - - @Override - public Map removeByPatternAndGet(String regex, Deserializer keyDeserializer, - Deserializer valueDeserializer) throws IOException { - final MapInboundAdapter inboundAdapter = - new MapInboundAdapter<>(keyDeserializer, valueDeserializer, new HashMap<>()); - return cacheClient.removeByPatternAndGet(regex, inboundAdapter); - } - @Override public AtomicCacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java index f87a5cf623..694d241549 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/NettyDistributedMapCacheClient.java @@ -18,8 +18,6 @@ package org.apache.nifi.distributed.cache.client; import org.apache.nifi.distributed.cache.client.adapter.AtomicCacheEntryInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.BooleanInboundAdapter; -import org.apache.nifi.distributed.cache.client.adapter.LongInboundAdapter; -import org.apache.nifi.distributed.cache.client.adapter.MapInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.MapValuesInboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.OutboundAdapter; import org.apache.nifi.distributed.cache.client.adapter.SetInboundAdapter; @@ -223,41 +221,6 @@ public class NettyDistributedMapCacheClient extends DistributedCacheClient { return valueAdapter.getResult(); } - /** - * Removes entries whose keys match the specified pattern. - * - * @param regex The regular expression / pattern on which to match the keys to be removed - * @return The number of entries that were removed - * @throws IOException if unable to communicate with the remote instance - */ - public long removeByPattern(String regex) throws IOException { - final OutboundAdapter outboundAdapter = new OutboundAdapter() - .write(MapOperation.REMOVE_BY_PATTERN.value()) - .write(regex); - final LongInboundAdapter inboundAdapter = new LongInboundAdapter(); - invoke(outboundAdapter, inboundAdapter); - return inboundAdapter.getResult(); - } - - /** - * Removes entries whose keys match the specified pattern, and returns a map of entries that - * were removed. - * - * @param type of key - * @param type of value - * @param regex The regular expression / pattern on which to match the keys to be removed - * @return A map of key/value entries that were removed from the cache - * @throws IOException if unable to communicate with the remote instance - */ - public Map removeByPatternAndGet(String regex, final MapInboundAdapter mapAdapter) throws IOException { - final OutboundAdapter outboundAdapter = new OutboundAdapter() - .minimumVersion(ProtocolVersion.V3.value()) - .write(MapOperation.REMOVE_BY_PATTERN_AND_GET.value()) - .write(regex); - invoke(outboundAdapter, mapAdapter); - return mapAdapter.getResult(); - } - /** * Fetch a CacheEntry with a key. * diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java index e44edee5ed..1feb6cfc48 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/operations/MapOperation.java @@ -29,8 +29,6 @@ public enum MapOperation implements CacheOperation { PUT_IF_ABSENT("putIfAbsent"), REMOVE("remove"), REMOVE_AND_GET("removeAndGet"), - REMOVE_BY_PATTERN("removeByPattern"), - REMOVE_BY_PATTERN_AND_GET("removeByPatternAndGet"), REPLACE("replace"), SUBMAP("subMap"), CLOSE("close"); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java index 3f2e26aff1..a0b0c3b44b 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java @@ -38,7 +38,7 @@ public class ProtocolHandshake { * If the server doesn't support requested protocol version, HandshakeException will be thrown.

* *

DistributedMapCache version histories:

    - *
  • 3: Added subMap, keySet, removeAndGet, removeByPatternAndGet methods.
  • + *
  • 3: Added subMap, keySet, removeAndGet, methods.
  • *
  • 2: Added atomic update operations (fetch and replace) using optimistic lock with revision number.
  • *
  • 1: Initial version.
  • *

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java index f482d6912b..44421ee707 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestDecoder.java @@ -57,10 +57,6 @@ public class MapCacheRequestDecoder extends CacheRequestDecoder { request = new MapCacheRequest(cacheOperation); } else if (MapOperation.REMOVE == cacheOperation) { request = readKeyRequest(cacheOperation, byteBuf); - } else if (MapOperation.REMOVE_BY_PATTERN == cacheOperation) { - request = readPatternRequest(cacheOperation, byteBuf); - } else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) { - request = readPatternRequest(cacheOperation, byteBuf); } else if (MapOperation.REMOVE_AND_GET == cacheOperation) { request = readKeyRequest(cacheOperation, byteBuf); } else if (MapOperation.REPLACE == cacheOperation) { @@ -116,12 +112,6 @@ public class MapCacheRequestDecoder extends CacheRequestDecoder { return mapCacheRequest; } - private MapCacheRequest readPatternRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) { - final Optional pattern = readUnicodeString(byteBuf); - final Optional request = pattern.map(requestedPattern -> new MapCacheRequest(cacheOperation, requestedPattern)); - return request.orElse(null); - } - private MapCacheRequest readSubMapRequest(final CacheOperation cacheOperation, final ByteBuf byteBuf) { final MapCacheRequest mapCacheRequest; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java index 2674962d1d..66f50ecc24 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/codec/MapCacheRequestHandler.java @@ -26,14 +26,12 @@ import org.apache.nifi.distributed.cache.server.map.MapCacheRecord; import org.apache.nifi.distributed.cache.server.map.MapPutResult; import org.apache.nifi.distributed.cache.server.protocol.CacheOperationResult; import org.apache.nifi.distributed.cache.server.protocol.MapCacheRequest; -import org.apache.nifi.distributed.cache.server.protocol.MapRemoveResponse; import org.apache.nifi.distributed.cache.server.protocol.MapSizeResponse; import org.apache.nifi.distributed.cache.server.protocol.MapValueResponse; import org.apache.nifi.logging.ComponentLog; import java.nio.ByteBuffer; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Set; @@ -106,23 +104,6 @@ public class MapCacheRequestHandler extends SimpleChannelInboundHandler removed = mapCache.removeByPattern(pattern); - final int size = removed == null ? 0 : removed.size(); - writeRemoved(channelHandlerContext, cacheOperation, size); - } else if (MapOperation.REMOVE_BY_PATTERN_AND_GET == cacheOperation) { - final String pattern = mapCacheRequest.getPattern(); - final Map removed = mapCache.removeByPattern(pattern); - if (removed == null) { - writeRemoved(channelHandlerContext, cacheOperation, 0); - } else { - writeSize(channelHandlerContext, cacheOperation, removed.size()); - for (final Map.Entry entry : removed.entrySet()) { - writeBytes(channelHandlerContext, cacheOperation, entry.getKey()); - writeBytes(channelHandlerContext, cacheOperation, entry.getValue()); - } - } } else if (MapOperation.REPLACE == cacheOperation) { final ByteBuffer key = ByteBuffer.wrap(mapCacheRequest.getKey()); final ByteBuffer value = ByteBuffer.wrap(mapCacheRequest.getValue()); @@ -147,12 +128,6 @@ public class MapCacheRequestHandler extends SimpleChannelInboundHandler removeByPattern(String regex) throws IOException; - MapCacheRecord fetch(ByteBuffer key) throws IOException; MapPutResult replace(MapCacheRecord record) throws IOException; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java index 03ed1deaf9..c74c5519b0 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/PersistentMapCache.java @@ -154,25 +154,6 @@ public class PersistentMapCache implements MapCache { return removeResult; } - @Override - public Map removeByPattern(final String regex) throws IOException { - final Map removeResult = wrapped.removeByPattern(regex); - if (removeResult != null) { - final List records = new ArrayList<>(removeResult.size()); - for(Map.Entry entry : removeResult.entrySet()) { - final MapWaliRecord record = new MapWaliRecord(UpdateType.DELETE, entry.getKey(), entry.getValue()); - records.add(record); - wali.update(records, false); - - final long modCount = modifications.getAndIncrement(); - if (modCount > 0 && modCount % 1000 == 0) { - wali.checkpoint(); - } - } - } - return removeResult; - } - @Override public Set keySet() throws IOException { return wrapped.keySet(); @@ -255,7 +236,7 @@ public class PersistentMapCache implements MapCache { @Override public MapWaliRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { - return deserializeEdit(in, new HashMap(), version); + return deserializeEdit(in, new HashMap<>(), version); } @Override @@ -281,7 +262,7 @@ public class PersistentMapCache implements MapCache { private static class SerdeFactory implements SerDeFactory { - private Serde serde; + private final Serde serde; public SerdeFactory() { this.serde = new Serde(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java index 8571432549..d920760d7e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/SimpleMapCache.java @@ -19,7 +19,6 @@ package org.apache.nifi.distributed.cache.server.map; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -29,8 +28,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.nifi.distributed.cache.server.EvictionPolicy; @@ -172,7 +169,7 @@ public class SimpleMapCache implements MapCache { } @Override - public Map subMap(List keys) throws IOException { + public Map subMap(List keys) { if (keys == null) { return null; } @@ -212,33 +209,7 @@ public class SimpleMapCache implements MapCache { } @Override - public Map removeByPattern(String regex) throws IOException { - writeLock.lock(); - try { - final Map removedMap = new HashMap<>(); - final List removedRecords = new ArrayList<>(); - Pattern p = Pattern.compile(regex); - for (ByteBuffer key : cache.keySet()) { - // Key must be backed by something that array() returns a byte[] that can be converted into a String via the default charset - Matcher m = p.matcher(new String(key.array())); - if (m.matches()) { - removedRecords.add(cache.get(key)); - } - } - removedRecords.forEach((record) -> { - cache.remove(record.getKey()); - inverseCacheMap.remove(record); - removedMap.put(record.getKey(), record.getValue()); - }); - - return removedMap; - } finally { - writeLock.unlock(); - } - } - - @Override - public MapCacheRecord fetch(ByteBuffer key) throws IOException { + public MapCacheRecord fetch(ByteBuffer key) { readLock.lock(); try { final MapCacheRecord record = cache.get(key); @@ -257,7 +228,7 @@ public class SimpleMapCache implements MapCache { } @Override - public MapPutResult replace(MapCacheRecord inputRecord) throws IOException { + public MapPutResult replace(MapCacheRecord inputRecord) { writeLock.lock(); try { final ByteBuffer key = inputRecord.getKey(); @@ -275,7 +246,7 @@ public class SimpleMapCache implements MapCache { } @Override - public Set keySet() throws IOException { + public Set keySet() { readLock.lock(); try { return cache.keySet(); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java index 16e6060af4..8a0d5303d3 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheTest.java @@ -131,16 +131,6 @@ public class DistributedMapCacheTest { } } - @Test - public void testRemoveByPattern() throws IOException { - final String[] keys = {"keyRemoveByPattern1", "keyRemoveByPattern2", "keyRemoveByPattern3"}; - final String value = "valueRemoveByPattern"; - for (String key : keys) { - client.put(key, value, serializer, serializer); - } - assertEquals(3, client.removeByPattern("keyRemoveByPattern\\d{1}")); - } - @Test public void testSubMap() throws IOException { final String key = "keySubMap"; @@ -156,21 +146,6 @@ public class DistributedMapCacheTest { } } - @Test - public void testRemoveByPatternAndGet() throws IOException { - final String key = "keyRemoveByPatternAndGet"; - final String value = "valueRemoveByPatternAndGet"; - for (int i = 0; (i < 3); ++i) { - client.put(key + i, value + i, serializer, serializer); - } - final Map map = client.removeByPatternAndGet( - "keyRemoveByPatternAndGet\\d{1}", deserializer, deserializer); - assertEquals(3, map.size()); - for (int i = 0; (i < 3); ++i) { - assertEquals(value + i, map.get(key + i)); - } - } - @Test public void testReplaceFetchAtomic() throws IOException { final String key = "keyReplaceAtomic"; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java index b16061a7f7..58c8506293 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java @@ -137,19 +137,6 @@ public class TestDistributedMapServerAndClient { final Set keys = client.keySet(deserializer); assertEquals(0, keys.size()); - // Test removeByPattern, the first two should be removed and the last should remain - client.put("test.1", "1", keySerializer, keySerializer); - client.put("test.2", "2", keySerializer, keySerializer); - client.put("test3", "2", keySerializer, keySerializer); - final long removedTwo = client.removeByPattern("test\\..*"); - assertEquals(2L, removedTwo); - assertFalse(client.containsKey("test.1", keySerializer)); - assertFalse(client.containsKey("test.2", keySerializer)); - assertTrue(client.containsKey("test3", keySerializer)); - - final boolean containedAfterRemove = client.containsKey("testKey", keySerializer); - assertFalse(containedAfterRemove); - client.putIfAbsent("testKey", "test", keySerializer, valueSerializer); runner.disableControllerService(client); @@ -280,7 +267,6 @@ public class TestDistributedMapServerAndClient { assertThrows(UnsupportedOperationException.class, () -> client.keySet(stringDeserializer)); assertThrows(UnsupportedOperationException.class, () -> client.removeAndGet("v.*", stringSerializer, stringDeserializer)); - assertThrows(UnsupportedOperationException.class, () ->client.removeByPatternAndGet("v.*", stringDeserializer, stringDeserializer)); } finally { client.close(); } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java index 21a3b536d0..fe7a9ddf9f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_2-client-service-bundle/nifi-hbase_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_2_ClientMapCacheService.java @@ -252,11 +252,6 @@ public class HBase_2_ClientMapCacheService extends AbstractControllerService imp return contains; } - @Override - public long removeByPattern(String regex) throws IOException { - throw new IOException("HBase removeByPattern is not implemented"); - } - @Override public void close() throws IOException { } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java index 648dc39a07..b34a5788e5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/src/test/java/org/apache/nifi/lookup/TestDistributedMapCacheLookupService.java @@ -29,7 +29,6 @@ import org.apache.nifi.util.TestRunners; import org.hamcrest.MatcherAssert; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -77,7 +76,7 @@ public class TestDistributedMapCacheLookupService { } @Override - public void close() throws IOException { + public void close() { } @Override @@ -90,39 +89,34 @@ public class TestDistributedMapCacheLookupService { } @Override - public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) { throw new UnsupportedOperationException("not implemented"); } @Override public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, - final Deserializer valueDeserializer) throws IOException { + final Deserializer valueDeserializer) { throw new UnsupportedOperationException("not implemented"); } @Override - public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + public boolean containsKey(final K key, final Serializer keySerializer) { throw new UnsupportedOperationException("not implemented"); } @Override @SuppressWarnings("unchecked") - public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) { return (V) map.get(key); } @Override - public boolean remove(final K key, final Serializer serializer) throws IOException { + public boolean remove(final K key, final Serializer serializer) { throw new UnsupportedOperationException("not implemented"); } @Override - public long removeByPattern(String regex) throws IOException { - throw new UnsupportedOperationException("not implemented"); - } - - @Override - public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) { throw new UnsupportedOperationException("not implemented"); } }