NIFI-4987: Added TTL to RedisDistributedMapCacheClientService

NIFI-4987: PR Review Fixes - Reverted getAndPutIfAbsent and added TTL setting with a different approach

NIFI-4987: PR Review Fixes - Added TTL to putIfAbsent()

This closes #2726.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
zenfenan 2018-05-20 19:22:26 +05:30 committed by Bryan Bende
parent d75ba167cd
commit 06d45c3a6e
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39

View File

@ -29,6 +29,7 @@ import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisConnectionPool;
import org.apache.nifi.redis.RedisType; import org.apache.nifi.redis.RedisType;
import org.apache.nifi.redis.util.RedisAction; import org.apache.nifi.redis.util.RedisAction;
@ -36,6 +37,7 @@ import org.apache.nifi.util.Tuple;
import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions; import org.springframework.data.redis.core.ScanOptions;
import org.springframework.data.redis.core.types.Expiration;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -44,6 +46,7 @@ import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
@Tags({ "redis", "distributed", "cache", "map" }) @Tags({ "redis", "distributed", "cache", "map" })
@CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " + @CapabilityDescription("An implementation of DistributedMapCacheClient that uses Redis as the backing cache. This service relies on " +
@ -59,14 +62,25 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
.required(true) .required(true)
.build(); .build();
public static final PropertyDescriptor TTL = new PropertyDescriptor.Builder()
.name("redis-cache-ttl")
.displayName("TTL")
.description("Indicates how long the data should exist in Redis. Setting '0 secs' would mean the data would exist forever")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.required(true)
.defaultValue("0 secs")
.build();
static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS; static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS;
static { static {
final List<PropertyDescriptor> props = new ArrayList<>(); final List<PropertyDescriptor> props = new ArrayList<>();
props.add(REDIS_CONNECTION_POOL); props.add(REDIS_CONNECTION_POOL);
props.add(TTL);
PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
} }
private volatile RedisConnectionPool redisConnectionPool; private volatile RedisConnectionPool redisConnectionPool;
private Long ttl;
@Override @Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -96,6 +110,11 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
@OnEnabled @OnEnabled
public void onEnabled(final ConfigurationContext context) { public void onEnabled(final ConfigurationContext context) {
this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class); this.redisConnectionPool = context.getProperty(REDIS_CONNECTION_POOL).asControllerService(RedisConnectionPool.class);
this.ttl = context.getProperty(TTL).asTimePeriod(TimeUnit.SECONDS);
if (ttl == 0) {
this.ttl = -1L;
}
} }
@OnDisabled @OnDisabled
@ -107,7 +126,13 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
return withConnection(redisConnection -> { return withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
return redisConnection.setNX(kv.getKey(), kv.getValue()); boolean set = redisConnection.setNX(kv.getKey(), kv.getValue());
if (ttl != -1L && set) {
redisConnection.expire(kv.getKey(), ttl);
}
return set;
}); });
} }
@ -124,6 +149,11 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
redisConnection.multi(); redisConnection.multi();
redisConnection.setNX(kv.getKey(), kv.getValue()); redisConnection.setNX(kv.getKey(), kv.getValue());
// Set the TTL only if the key doesn't exist already
if (ttl != -1L && existingValue == null) {
redisConnection.expire(kv.getKey(), ttl);
}
// execute the transaction // execute the transaction
final List<Object> results = redisConnection.exec(); final List<Object> results = redisConnection.exec();
@ -146,7 +176,6 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
}); });
} }
@Override @Override
public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException { public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
return withConnection(redisConnection -> { return withConnection(redisConnection -> {
@ -159,7 +188,7 @@ public class RedisDistributedMapCacheClientService extends AbstractControllerSer
public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException { public <K, V> void put(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
withConnection(redisConnection -> { withConnection(redisConnection -> {
final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer); final Tuple<byte[],byte[]> kv = serialize(key, value, keySerializer, valueSerializer);
redisConnection.set(kv.getKey(), kv.getValue()); redisConnection.set(kv.getKey(), kv.getValue(), Expiration.seconds(ttl), null);
return null; return null;
}); });
} }