diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java index 6a36d485c5..722ca63649 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisDistributedMapCacheClientService.java @@ -18,63 +18,30 @@ package org.apache.nifi.redis.service; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.distributed.cache.client.AtomicCacheEntry; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisType; -import org.apache.nifi.redis.util.RedisAction; -import org.apache.nifi.util.Tuple; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.core.Cursor; -import org.springframework.data.redis.core.ScanOptions; -import org.springframework.data.redis.core.types.Expiration; -import org.springframework.data.redis.connection.RedisStringCommands.SetOption; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL; -import static org.apache.nifi.redis.util.RedisUtils.TTL; @Tags({ "redis", "distributed", "cache", "map" }) @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " + "the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " + "can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " + "provide high-availability configurations.") -public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient { - - static final List PROPERTY_DESCRIPTORS; - static { - final List props = new ArrayList<>(); - props.add(REDIS_CONNECTION_POOL); - props.add(TTL); - PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); - } - - private volatile RedisConnectionPool redisConnectionPool; - private Long ttl; - - @Override - protected List getSupportedPropertyDescriptors() { - return PROPERTY_DESCRIPTORS; - } +public class RedisDistributedMapCacheClientService extends SimpleRedisDistributedMapCacheClientService implements AtomicDistributedMapCacheClient { @Override protected Collection customValidate(ValidationContext validationContext) { @@ -96,179 +63,6 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer return results; } - @OnEnabled - public void onEnabled(final ConfigurationContext context) { - this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); - this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS); - - if (ttl == 0) { - this.ttl = -1L; - } - } - - @OnDisabled - public void onDisabled() { - this.redisConnectionPool = null; - } - - @Override - public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - return withConnection(redisConnection -> { - final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - boolean set = redisConnection.setNX(kv.getKey(), kv.getValue()); - - if (ttl != -1L && set) { - redisConnection.expire(kv.getKey(), ttl); - } - - return set; - }); - } - - @Override - public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { - return withConnection(redisConnection -> { - final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - do { - // start a watch on the key and retrieve the current value - redisConnection.watch(kv.getKey()); - final byte[] existingValue = redisConnection.get(kv.getKey()); - - // start a transaction and perform the put-if-absent - redisConnection.multi(); - redisConnection.setNX(kv.getKey(), kv.getValue()); - - // Set the TTL only if the key doesn't exist already - if (ttl != -1L && existingValue == null) { - redisConnection.expire(kv.getKey(), ttl); - } - - // execute the transaction - final List results = redisConnection.exec(); - - // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry - // if the results list was null, then the transaction failed - // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation - if (results != null && results.size() > 0) { - final Object firstResult = results.get(0); - if (firstResult instanceof Boolean) { - final Boolean absent = (Boolean) firstResult; - return absent ? null : valueDeserializer.deserialize(existingValue); - } else { - // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop - throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " - + firstResult.getClass().getName() + " with value " + firstResult.toString()); - } - } - } while (isEnabled()); - - return null; - }); - } - - @Override - public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { - return withConnection(redisConnection -> { - final byte[] k = serialize(key, keySerializer); - return redisConnection.exists(k); - }); - } - - @Override - public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - withConnection(redisConnection -> { - final Tuple kv = serialize(key, value, keySerializer, valueSerializer); - redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), SetOption.upsert()); - return null; - }); - } - - @Override - public void putAll(Map keysAndValues, Serializer keySerializer, Serializer valueSerializer) throws IOException { - withConnection(redisConnection -> { - Map values = new HashMap<>(); - for (Map.Entry entry : keysAndValues.entrySet()) { - final Tuple kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); - values.put(kv.getKey(), kv.getValue()); - } - - if (getLogger().isDebugEnabled()) { - getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size())); - } - - if (!values.isEmpty()) { - redisConnection.mSet(values); - if (ttl != -1L) { - values.keySet().forEach(k -> redisConnection.expire(k, ttl)); - } - } - return null; - }); - } - - @Override - public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { - return withConnection(redisConnection -> { - final byte[] k = serialize(key, keySerializer); - final byte[] v = redisConnection.get(k); - return v == null ? null : valueDeserializer.deserialize(v); - }); - } - - @Override - public void close() throws IOException { - // nothing to do - } - - @Override - public boolean remove(final K key, final Serializer keySerializer) throws IOException { - return withConnection(redisConnection -> { - final byte[] k = serialize(key, keySerializer); - final long numRemoved = redisConnection.del(k); - return numRemoved > 0; - }); - } - - @Override - public long removeByPattern(final String regex) throws IOException { - return withConnection(redisConnection -> { - long deletedCount = 0; - final List batchKeys = new ArrayList<>(); - - // delete keys in batches of 1000 using the cursor - final Cursor cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build()); - while (cursor.hasNext()) { - batchKeys.add(cursor.next()); - - if (batchKeys.size() == 1000) { - deletedCount += redisConnection.del(getKeys(batchKeys)); - batchKeys.clear(); - } - } - - // delete any left-over keys if some were added to the batch but never reached 1000 - if (batchKeys.size() > 0) { - deletedCount += redisConnection.del(getKeys(batchKeys)); - batchKeys.clear(); - } - - return deletedCount; - }); - } - - /** - * Convert the list of all keys to an array. - */ - private byte[][] getKeys(final List keys) { - final byte[][] allKeysArray = new byte[keys.size()][]; - for (int i=0; i < keys.size(); i++) { - allKeysArray[i] = keys.get(i); - } - return allKeysArray; - } - - // ----------------- Methods from AtomicDistributedMapCacheClient ------------------------ - @Override public AtomicCacheEntry fetch(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { return withConnection(redisConnection -> { @@ -315,6 +109,7 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer redisConnection.getSet(k, newVal); // set the TTL if specified + final long ttl = getTtl(); if (ttl != -1L) { redisConnection.expire(k, ttl); } @@ -332,43 +127,4 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer }); } - // ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------ - - private Tuple serialize(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - - keySerializer.serialize(key, out); - final byte[] k = out.toByteArray(); - - out.reset(); - - valueSerializer.serialize(value, out); - final byte[] v = out.toByteArray(); - - return new Tuple<>(k, v); - } - - private byte[] serialize(final K key, final Serializer keySerializer) throws IOException { - final ByteArrayOutputStream out = new ByteArrayOutputStream(); - - keySerializer.serialize(key, out); - return out.toByteArray(); - } - - private T withConnection(final RedisAction action) throws IOException { - RedisConnection redisConnection = null; - try { - redisConnection = redisConnectionPool.getConnection(); - return action.execute(redisConnection); - } finally { - if (redisConnection != null) { - try { - redisConnection.close(); - } catch (Exception e) { - getLogger().warn("Error closing connection: " + e.getMessage(), e); - } - } - } - } - } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java new file mode 100644 index 0000000000..33f9340523 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/SimpleRedisDistributedMapCacheClientService.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.service; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.distributed.cache.client.Deserializer; +import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; +import org.apache.nifi.distributed.cache.client.Serializer; +import org.apache.nifi.redis.RedisConnectionPool; +import org.apache.nifi.redis.util.RedisAction; +import org.apache.nifi.util.Tuple; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisStringCommands; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.springframework.data.redis.core.types.Expiration; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL; +import static org.apache.nifi.redis.util.RedisUtils.TTL; + +@Tags({ "redis", "distributed", "cache", "map" }) +@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. " + + "This service is intended to be used when a non-atomic DistributedMapCacheClient is required.") +public class SimpleRedisDistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient { + + static final List PROPERTY_DESCRIPTORS; + static { + final List props = new ArrayList<>(); + props.add(REDIS_CONNECTION_POOL); + props.add(TTL); + PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); + } + + private volatile RedisConnectionPool redisConnectionPool; + private Long ttl; + + protected Long getTtl() { + return ttl; + } + + @Override + protected List getSupportedPropertyDescriptors() { + return PROPERTY_DESCRIPTORS; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); + this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS); + + if (ttl == 0) { + this.ttl = -1L; + } + } + + @OnDisabled + public void onDisabled() { + this.redisConnectionPool = null; + } + + @Override + public boolean putIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + return withConnection(redisConnection -> { + final Tuple kv = serialize(key, value, keySerializer, valueSerializer); + boolean set = redisConnection.setNX(kv.getKey(), kv.getValue()); + + if (ttl != -1L && set) { + redisConnection.expire(kv.getKey(), ttl); + } + + return set; + }); + } + + @Override + public V getAndPutIfAbsent(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, final Deserializer valueDeserializer) throws IOException { + return withConnection(redisConnection -> { + final Tuple kv = serialize(key, value, keySerializer, valueSerializer); + do { + // start a watch on the key and retrieve the current value + redisConnection.watch(kv.getKey()); + final byte[] existingValue = redisConnection.get(kv.getKey()); + + // start a transaction and perform the put-if-absent + redisConnection.multi(); + redisConnection.setNX(kv.getKey(), kv.getValue()); + + // Set the TTL only if the key doesn't exist already + if (ttl != -1L && existingValue == null) { + redisConnection.expire(kv.getKey(), ttl); + } + + // execute the transaction + final List results = redisConnection.exec(); + + // if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry + // if the results list was null, then the transaction failed + // if the results list has results, then the transaction succeeded and it should have the result of the setNX operation + if (results != null && results.size() > 0) { + final Object firstResult = results.get(0); + if (firstResult instanceof Boolean) { + final Boolean absent = (Boolean) firstResult; + return absent ? null : valueDeserializer.deserialize(existingValue); + } else { + // this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop + throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got " + + firstResult.getClass().getName() + " with value " + firstResult.toString()); + } + } + } while (isEnabled()); + + return null; + }); + } + + @Override + public boolean containsKey(final K key, final Serializer keySerializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + return redisConnection.exists(k); + }); + } + + @Override + public void put(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + withConnection(redisConnection -> { + final Tuple kv = serialize(key, value, keySerializer, valueSerializer); + redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), RedisStringCommands.SetOption.upsert()); + return null; + }); + } + + @Override + public void putAll(Map keysAndValues, Serializer keySerializer, Serializer valueSerializer) throws IOException { + withConnection(redisConnection -> { + Map values = new HashMap<>(); + for (Map.Entry entry : keysAndValues.entrySet()) { + final Tuple kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer); + values.put(kv.getKey(), kv.getValue()); + } + + if (getLogger().isDebugEnabled()) { + getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size())); + } + + if (!values.isEmpty()) { + redisConnection.mSet(values); + if (ttl != -1L) { + values.keySet().forEach(k -> redisConnection.expire(k, ttl)); + } + } + return null; + }); + } + + @Override + public V get(final K key, final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + final byte[] v = redisConnection.get(k); + return v == null ? null : valueDeserializer.deserialize(v); + }); + } + + @Override + public void close() throws IOException { + // nothing to do + } + + @Override + public boolean remove(final K key, final Serializer keySerializer) throws IOException { + return withConnection(redisConnection -> { + final byte[] k = serialize(key, keySerializer); + final long numRemoved = redisConnection.del(k); + return numRemoved > 0; + }); + } + + @Override + public long removeByPattern(final String regex) throws IOException { + return withConnection(redisConnection -> { + long deletedCount = 0; + final List batchKeys = new ArrayList<>(); + + // delete keys in batches of 1000 using the cursor + final Cursor cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build()); + while (cursor.hasNext()) { + batchKeys.add(cursor.next()); + + if (batchKeys.size() == 1000) { + deletedCount += redisConnection.del(getKeys(batchKeys)); + batchKeys.clear(); + } + } + + // delete any left-over keys if some were added to the batch but never reached 1000 + if (batchKeys.size() > 0) { + deletedCount += redisConnection.del(getKeys(batchKeys)); + batchKeys.clear(); + } + + return deletedCount; + }); + } + + /** + * Convert the list of all keys to an array. + */ + protected byte[][] getKeys(final List keys) { + final byte[][] allKeysArray = new byte[keys.size()][]; + for (int i=0; i < keys.size(); i++) { + allKeysArray[i] = keys.get(i); + } + return allKeysArray; + } + + protected Tuple serialize(final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + keySerializer.serialize(key, out); + final byte[] k = out.toByteArray(); + + out.reset(); + + valueSerializer.serialize(value, out); + final byte[] v = out.toByteArray(); + + return new Tuple<>(k, v); + } + + protected byte[] serialize(final K key, final Serializer keySerializer) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + keySerializer.serialize(key, out); + return out.toByteArray(); + } + + protected T withConnection(final RedisAction action) throws IOException { + RedisConnection redisConnection = null; + try { + redisConnection = redisConnectionPool.getConnection(); + return action.execute(redisConnection); + } finally { + if (redisConnection != null) { + try { + redisConnection.close(); + } catch (Exception e) { + getLogger().warn("Error closing connection: " + e.getMessage(), e); + } + } + } + } +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 5d4073fdd5..c7da33be1f 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -13,4 +13,5 @@ # See the License for the specific language governing permissions and # limitations under the License. org.apache.nifi.redis.service.RedisConnectionPoolService -org.apache.nifi.redis.service.RedisDistributedMapCacheClientService \ No newline at end of file +org.apache.nifi.redis.service.RedisDistributedMapCacheClientService +org.apache.nifi.redis.service.SimpleRedisDistributedMapCacheClientService \ No newline at end of file