NIFI-5830 - RedisConnectionPoolService does not work with Standalone Redis using non-localhost deployment

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #3176.
This commit is contained in:
Alexander Bukarev 2018-12-04 22:15:37 +03:00 committed by Pierre Villard
parent f1e03b5ed5
commit 84c32f9137
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
1 changed files with 30 additions and 30 deletions

View File

@ -27,11 +27,15 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.redis.RedisType;
import org.apache.nifi.util.StringUtils;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisConfiguration;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -264,19 +268,29 @@ public class RedisUtils {
final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
final JedisClientConfiguration jedisClientConfiguration = JedisClientConfiguration.builder()
.connectTimeout(Duration.ofMillis(timeout))
.readTimeout(Duration.ofMillis(timeout))
.usePooling()
.poolConfig(poolConfig)
.build();
JedisConnectionFactory connectionFactory;
if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
final JedisShardInfo jedisShardInfo = createJedisShardInfo(connectionString, timeout, password);
logger.info("Connecting to Redis in standalone mode at " + connectionString);
connectionFactory = new JedisConnectionFactory(jedisShardInfo);
final String[] hostAndPortSplit = connectionString.split("[:]");
final String host = hostAndPortSplit[0].trim();
final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
final RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
enrichRedisConfiguration(redisStandaloneConfiguration, dbIndex, password);
connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration);
} else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
final String[] sentinels = connectionString.split("[,]");
final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels)));
final JedisShardInfo jedisShardInfo = createJedisShardInfo(sentinels[0], timeout, password);
enrichRedisConfiguration(sentinelConfiguration, dbIndex, password);
logger.info("Connecting to Redis in sentinel mode...");
logger.info("Redis master = " + sentinelMaster);
@ -285,14 +299,14 @@ public class RedisUtils {
logger.info("Redis sentinel at " + sentinel);
}
connectionFactory = new JedisConnectionFactory(sentinelConfiguration, poolConfig);
connectionFactory.setShardInfo(jedisShardInfo);
connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration);
} else {
final String[] clusterNodes = connectionString.split("[,]");
final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes));
enrichRedisConfiguration(clusterConfiguration, dbIndex, password);
clusterConfiguration.setMaxRedirects(maxRedirects);
logger.info("Connecting to Redis in clustered mode...");
@ -300,16 +314,7 @@ public class RedisUtils {
logger.info("Redis cluster node at " + clusterNode);
}
connectionFactory = new JedisConnectionFactory(clusterConfiguration, poolConfig);
}
connectionFactory.setUsePool(true);
connectionFactory.setPoolConfig(poolConfig);
connectionFactory.setDatabase(dbIndex);
connectionFactory.setTimeout(timeout);
if (!StringUtils.isBlank(password)) {
connectionFactory.setPassword(password);
connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration);
}
// need to call this to initialize the pool/connections
@ -325,20 +330,15 @@ public class RedisUtils {
return trimmedValues;
}
private static JedisShardInfo createJedisShardInfo(final String hostAndPort, final Integer timeout, final String password) {
final String[] hostAndPortSplit = hostAndPort.split("[:]");
final String host = hostAndPortSplit[0].trim();
final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
final JedisShardInfo jedisShardInfo = new JedisShardInfo(host, port);
jedisShardInfo.setConnectionTimeout(timeout);
jedisShardInfo.setSoTimeout(timeout);
if (!StringUtils.isEmpty(password)) {
jedisShardInfo.setPassword(password);
private static void enrichRedisConfiguration(final RedisConfiguration redisConfiguration,
final Integer dbIndex,
final String password) {
if (redisConfiguration instanceof RedisConfiguration.WithDatabaseIndex) {
((RedisConfiguration.WithDatabaseIndex) redisConfiguration).setDatabase(dbIndex);
}
if (redisConfiguration instanceof RedisConfiguration.WithPassword) {
((RedisConfiguration.WithPassword) redisConfiguration).setPassword(RedisPassword.of(password));
}
return jedisShardInfo;
}
private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) {