mirror of https://github.com/apache/nifi.git
NIFI-12104 Separate a non-atomic Redis DMC implementation from the existing one for use in Put/Fetch DMC when Redis is clustered
This closes #7796 Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
@ -18,63 +18,30 @@ package org.apache.nifi.redis.service;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration;
import org.springframework.data.redis.connection.RedisStringCommands.SetOption;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.apache.nifi.redis.util.RedisUtils.TTL;
@Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
"the WATCH, MULTI, and EXEC commands in Redis, which are not fully supported when Redis is clustered. As a result, this service " +
"can only be used with a Redis Connection Pool that is configured for standalone or sentinel mode. Sentinel mode can be used to " +
"provide high-availability configurations.")
public class RedisDistributedMapCacheClientService extends AbstractControllerService implements AtomicDistributedMapCacheClient<byte[]> {
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
private volatile RedisConnectionPool redisConnectionPool;
private Long ttl;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
public class RedisDistributedMapCacheClientService extends SimpleRedisDistributedMapCacheClientService implements AtomicDistributedMapCacheClient<byte[]> {
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
@ -96,179 +63,6 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
return results;
public void onEnabled(final ConfigurationContext context) {
this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
if (ttl == 0) {
this.ttl = -1L;
public void onDisabled() {
this.redisConnectionPool = null;
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
if (ttl != -1L && set) {
redisConnection.expire(kv.getKey(), ttl);
return set;
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
do {
// start a watch on the key and retrieve the current value
final byte[] existingValue = redisConnection.get(kv.getKey());
// start a transaction and perform the put-if-absent
redisConnection.setNX(kv.getKey(), kv.getValue());
// Set the TTL only if the key doesn't exist already
if (ttl != -1L && existingValue == null) {
redisConnection.expire(kv.getKey(), ttl);
// execute the transaction
final List<Object> results = redisConnection.exec();
// if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry
// if the results list was null, then the transaction failed
// if the results list has results, then the transaction succeeded and it should have the result of the setNX operation
if (results != null && results.size() > 0) {
final Object firstResult = results.get(0);
if (firstResult instanceof Boolean) {
final Boolean absent = (Boolean) firstResult;
return absent ? null : valueDeserializer.deserialize(existingValue);
} else {
// this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop
throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got "
+ firstResult.getClass().getName() + " with value " + firstResult.toString());
} while (isEnabled());
return null;
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
return redisConnection.exists(k);
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), SetOption.upsert());
return null;
public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
Map<byte[], byte[]> values = new HashMap<>();
for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer);
values.put(kv.getKey(), kv.getValue());
if (getLogger().isDebugEnabled()) {
getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size()));
if (!values.isEmpty()) {
if (ttl != -1L) {
values.keySet().forEach(k -> redisConnection.expire(k, ttl));
return null;
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final byte[] v = redisConnection.get(k);
return v == null ? null : valueDeserializer.deserialize(v);
public void close() throws IOException {
// nothing to do
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final long numRemoved = redisConnection.del(k);
return numRemoved > 0;
public long removeByPattern(final String regex) throws IOException {
return withConnection(redisConnection -> {
long deletedCount = 0;
final List<byte[]> batchKeys = new ArrayList<>();
// delete keys in batches of 1000 using the cursor
final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
while (cursor.hasNext()) {
if (batchKeys.size() == 1000) {
deletedCount += redisConnection.del(getKeys(batchKeys));
// delete any left-over keys if some were added to the batch but never reached 1000
if (batchKeys.size() > 0) {
deletedCount += redisConnection.del(getKeys(batchKeys));
return deletedCount;
* Convert the list of all keys to an array.
private byte[][] getKeys(final List<byte[]> keys) {
final byte[][] allKeysArray = new byte[keys.size()][];
for (int i=0; i < keys.size(); i++) {
allKeysArray[i] = keys.get(i);
return allKeysArray;
// ----------------- Methods from AtomicDistributedMapCacheClient ------------------------
public <K, V> AtomicCacheEntry<K, V, byte[]> fetch(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
@ -315,6 +109,7 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
redisConnection.getSet(k, newVal);
// set the TTL if specified
final long ttl = getTtl();
if (ttl != -1L) {
redisConnection.expire(k, ttl);
@ -332,43 +127,4 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
// ----------------- END Methods from AtomicDistributedMapCacheClient ------------------------
private <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
final byte[] k = out.toByteArray();
valueSerializer.serialize(value, out);
final byte[] v = out.toByteArray();
return new Tuple<>(k, v);
private <K> byte[] serialize(final K key, final Serializer<K> keySerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
return out.toByteArray();
private <T> T withConnection(final RedisAction<T> action) throws IOException {
RedisConnection redisConnection = null;
try {
redisConnection = redisConnectionPool.getConnection();
return action.execute(redisConnection);
} finally {
if (redisConnection != null) {
try {
} catch (Exception e) {
getLogger().warn("Error closing connection: " + e.getMessage(), e);
@ -0,0 +1,282 @@
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.redis.service;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.util.RedisAction;
import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.nifi.redis.util.RedisUtils.REDIS_CONNECTION_POOL;
import static org.apache.nifi.redis.util.RedisUtils.TTL;
@Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. " +
"This service is intended to be used when a non-atomic DistributedMapCacheClient is required.")
public class SimpleRedisDistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static {
final List<PropertyDescriptor> props = new ArrayList<>();
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
private volatile RedisConnectionPool redisConnectionPool;
private Long ttl;
protected Long getTtl() {
return ttl;
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
public void onEnabled(final ConfigurationContext context) {
this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
if (ttl == 0) {
this.ttl = -1L;
public void onDisabled() {
this.redisConnectionPool = null;
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
if (ttl != -1L && set) {
redisConnection.expire(kv.getKey(), ttl);
return set;
public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
do {
// start a watch on the key and retrieve the current value
final byte[] existingValue = redisConnection.get(kv.getKey());
// start a transaction and perform the put-if-absent
redisConnection.setNX(kv.getKey(), kv.getValue());
// Set the TTL only if the key doesn't exist already
if (ttl != -1L && existingValue == null) {
redisConnection.expire(kv.getKey(), ttl);
// execute the transaction
final List<Object> results = redisConnection.exec();
// if the results list was empty, then the transaction failed (i.e. key was modified after we started watching), so keep looping to retry
// if the results list was null, then the transaction failed
// if the results list has results, then the transaction succeeded and it should have the result of the setNX operation
if (results != null && results.size() > 0) {
final Object firstResult = results.get(0);
if (firstResult instanceof Boolean) {
final Boolean absent = (Boolean) firstResult;
return absent ? null : valueDeserializer.deserialize(existingValue);
} else {
// this shouldn't really happen, but just in case there is a non-boolean result then bounce out of the loop
throw new IOException("Unexpected result from Redis transaction: Expected Boolean result, but got "
+ firstResult.getClass().getName() + " with value " + firstResult.toString());
} while (isEnabled());
return null;
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
return redisConnection.exists(k);
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), RedisStringCommands.SetOption.upsert());
return null;
public <K, V> void putAll(Map<K, V> keysAndValues, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> {
Map<byte[], byte[]> values = new HashMap<>();
for (Map.Entry<K, V> entry : keysAndValues.entrySet()) {
final Tuple<byte[],byte[]> kv = serialize(entry.getKey(), entry.getValue(), keySerializer, valueSerializer);
values.put(kv.getKey(), kv.getValue());
if (getLogger().isDebugEnabled()) {
getLogger().debug(String.format("Queued up %d tuples to mset on Redis connection.", values.size()));
if (!values.isEmpty()) {
if (ttl != -1L) {
values.keySet().forEach(k -> redisConnection.expire(k, ttl));
return null;
public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final byte[] v = redisConnection.get(k);
return v == null ? null : valueDeserializer.deserialize(v);
public void close() throws IOException {
// nothing to do
public <K> boolean remove(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> {
final byte[] k = serialize(key, keySerializer);
final long numRemoved = redisConnection.del(k);
return numRemoved > 0;
public long removeByPattern(final String regex) throws IOException {
return withConnection(redisConnection -> {
long deletedCount = 0;
final List<byte[]> batchKeys = new ArrayList<>();
// delete keys in batches of 1000 using the cursor
final Cursor<byte[]> cursor = redisConnection.scan(ScanOptions.scanOptions().count(100).match(regex).build());
while (cursor.hasNext()) {
if (batchKeys.size() == 1000) {
deletedCount += redisConnection.del(getKeys(batchKeys));
// delete any left-over keys if some were added to the batch but never reached 1000
if (batchKeys.size() > 0) {
deletedCount += redisConnection.del(getKeys(batchKeys));
return deletedCount;
* Convert the list of all keys to an array.
protected byte[][] getKeys(final List<byte[]> keys) {
final byte[][] allKeysArray = new byte[keys.size()][];
for (int i=0; i < keys.size(); i++) {
allKeysArray[i] = keys.get(i);
return allKeysArray;
protected <K, V> Tuple<byte[],byte[]> serialize(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
final byte[] k = out.toByteArray();
valueSerializer.serialize(value, out);
final byte[] v = out.toByteArray();
return new Tuple<>(k, v);
protected <K> byte[] serialize(final K key, final Serializer<K> keySerializer) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream();
keySerializer.serialize(key, out);
return out.toByteArray();
protected <T> T withConnection(final RedisAction<T> action) throws IOException {
RedisConnection redisConnection = null;
try {
redisConnection = redisConnectionPool.getConnection();
return action.execute(redisConnection);
} finally {
if (redisConnection != null) {
try {
} catch (Exception e) {
getLogger().warn("Error closing connection: " + e.getMessage(), e);
@ -14,3 +14,4 @@
# limitations under the License.
Reference in New Issue