mirror of https://github.com/apache/nifi.git
NIFI-12149 Create nifi-redis-utils and minor improvements to util methods
Signed-off-by: Joe Gresock <jgresock@gmail.com> This closes #7812.
This commit is contained in:
parent
7ddcb91605
commit
551625f7bf
|
@ -38,6 +38,11 @@
|
|||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-redis-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-redis</artifactId>
|
||||
|
|
|
@ -90,7 +90,7 @@ RedisConnectionPoolService extends AbstractControllerService implements RedisCon
|
|||
if (connectionFactory == null) {
|
||||
synchronized (this) {
|
||||
if (connectionFactory == null) {
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, getLogger(), sslContext);
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, sslContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -344,7 +344,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
// visible for testing
|
||||
synchronized RedisConnection getRedis() {
|
||||
if (connectionFactory == null) {
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, logger, sslContext);
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, sslContext);
|
||||
}
|
||||
|
||||
return connectionFactory.getConnection();
|
||||
|
|
|
@ -126,7 +126,7 @@ public class TestRedisConnectionPoolService {
|
|||
final SSLContextService sslContextService = configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
providedSslContext = sslContextService.createContext();
|
||||
}
|
||||
JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), providedSslContext);
|
||||
JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, providedSslContext);
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-redis-bundle</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>nifi-redis-utils</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-redis-service-api</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-redis</artifactId>
|
||||
<version>${spring.data.redis.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>redis.clients</groupId>
|
||||
<artifactId>jedis</artifactId>
|
||||
<version>${jedis.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-utils</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>junit-jupiter</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.redis.util;
|
||||
|
||||
import org.apache.nifi.redis.RedisType;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
|
||||
public class RedisConfig {
|
||||
|
||||
private final RedisType redisMode;
|
||||
private final String connectionString;
|
||||
|
||||
private String sentinelMaster;
|
||||
private String sentinelPassword;
|
||||
private String password;
|
||||
|
||||
private int dbIndex = 0;
|
||||
private int timeout = 10000;
|
||||
private int clusterMaxRedirects = 5;
|
||||
|
||||
private int poolMaxTotal = 8;
|
||||
private int poolMaxIdle = 8;
|
||||
private int poolMinIdle = 0;
|
||||
private boolean blockWhenExhausted = true;
|
||||
private Duration maxWaitTime = Duration.ofSeconds(10);
|
||||
private Duration minEvictableIdleTime = Duration.ofSeconds(60);
|
||||
private Duration timeBetweenEvictionRuns = Duration.ofSeconds(30);
|
||||
private int numTestsPerEvictionRun = -1;
|
||||
private boolean testOnCreate = true;
|
||||
private boolean testOnBorrow = true;
|
||||
private boolean testOnReturn = false;
|
||||
private boolean testWhenIdle = true;
|
||||
|
||||
public RedisConfig(final RedisType redisMode, final String connectionString) {
|
||||
this.redisMode = Objects.requireNonNull(redisMode);
|
||||
this.connectionString = Objects.requireNonNull(connectionString);
|
||||
}
|
||||
|
||||
public RedisType getRedisMode() {
|
||||
return redisMode;
|
||||
}
|
||||
|
||||
public String getConnectionString() {
|
||||
return connectionString;
|
||||
}
|
||||
|
||||
public String getSentinelMaster() {
|
||||
return sentinelMaster;
|
||||
}
|
||||
|
||||
public void setSentinelMaster(String sentinelMaster) {
|
||||
this.sentinelMaster = sentinelMaster;
|
||||
}
|
||||
|
||||
public String getSentinelPassword() {
|
||||
return sentinelPassword;
|
||||
}
|
||||
|
||||
public void setSentinelPassword(String sentinelPassword) {
|
||||
this.sentinelPassword = sentinelPassword;
|
||||
}
|
||||
|
||||
public int getDbIndex() {
|
||||
return dbIndex;
|
||||
}
|
||||
|
||||
public void setDbIndex(int dbIndex) {
|
||||
this.dbIndex = dbIndex;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public void setPassword(String password) {
|
||||
this.password = password;
|
||||
}
|
||||
|
||||
public int getTimeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
public int getClusterMaxRedirects() {
|
||||
return clusterMaxRedirects;
|
||||
}
|
||||
|
||||
public void setClusterMaxRedirects(int clusterMaxRedirects) {
|
||||
this.clusterMaxRedirects = clusterMaxRedirects;
|
||||
}
|
||||
|
||||
public int getPoolMaxTotal() {
|
||||
return poolMaxTotal;
|
||||
}
|
||||
|
||||
public void setPoolMaxTotal(int poolMaxTotal) {
|
||||
this.poolMaxTotal = poolMaxTotal;
|
||||
}
|
||||
|
||||
public int getPoolMaxIdle() {
|
||||
return poolMaxIdle;
|
||||
}
|
||||
|
||||
public void setPoolMaxIdle(int poolMaxIdle) {
|
||||
this.poolMaxIdle = poolMaxIdle;
|
||||
}
|
||||
|
||||
public int getPoolMinIdle() {
|
||||
return poolMinIdle;
|
||||
}
|
||||
|
||||
public void setPoolMinIdle(int poolMinIdle) {
|
||||
this.poolMinIdle = poolMinIdle;
|
||||
}
|
||||
|
||||
public boolean getBlockWhenExhausted() {
|
||||
return blockWhenExhausted;
|
||||
}
|
||||
|
||||
public void setBlockWhenExhausted(boolean blockWhenExhausted) {
|
||||
this.blockWhenExhausted = blockWhenExhausted;
|
||||
}
|
||||
|
||||
public Duration getMaxWaitTime() {
|
||||
return maxWaitTime;
|
||||
}
|
||||
|
||||
public void setMaxWaitTime(Duration maxWaitTime) {
|
||||
this.maxWaitTime = maxWaitTime;
|
||||
}
|
||||
|
||||
public Duration getMinEvictableIdleTime() {
|
||||
return minEvictableIdleTime;
|
||||
}
|
||||
|
||||
public void setMinEvictableIdleTime(Duration minEvictableIdleTime) {
|
||||
this.minEvictableIdleTime = minEvictableIdleTime;
|
||||
}
|
||||
|
||||
public Duration getTimeBetweenEvictionRuns() {
|
||||
return timeBetweenEvictionRuns;
|
||||
}
|
||||
|
||||
public void setTimeBetweenEvictionRuns(Duration timeBetweenEvictionRuns) {
|
||||
this.timeBetweenEvictionRuns = timeBetweenEvictionRuns;
|
||||
}
|
||||
|
||||
public int getNumTestsPerEvictionRun() {
|
||||
return numTestsPerEvictionRun;
|
||||
}
|
||||
|
||||
public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
|
||||
this.numTestsPerEvictionRun = numTestsPerEvictionRun;
|
||||
}
|
||||
|
||||
public boolean getTestOnCreate() {
|
||||
return testOnCreate;
|
||||
}
|
||||
|
||||
public void setTestOnCreate(boolean testOnCreate) {
|
||||
this.testOnCreate = testOnCreate;
|
||||
}
|
||||
|
||||
public boolean getTestOnBorrow() {
|
||||
return testOnBorrow;
|
||||
}
|
||||
|
||||
public void setTestOnBorrow(boolean testOnBorrow) {
|
||||
this.testOnBorrow = testOnBorrow;
|
||||
}
|
||||
|
||||
public boolean getTestOnReturn() {
|
||||
return testOnReturn;
|
||||
}
|
||||
|
||||
public void setTestOnReturn(boolean testOnReturn) {
|
||||
this.testOnReturn = testOnReturn;
|
||||
}
|
||||
|
||||
public boolean getTestWhenIdle() {
|
||||
return testWhenIdle;
|
||||
}
|
||||
|
||||
public void setTestWhenIdle(boolean testWhenIdle) {
|
||||
this.testWhenIdle = testWhenIdle;
|
||||
}
|
||||
}
|
|
@ -16,37 +16,47 @@
|
|||
*/
|
||||
package org.apache.nifi.redis.util;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.redis.RedisConnectionPool;
|
||||
import org.apache.nifi.redis.RedisType;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.connection.RedisClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.RedisPassword;
|
||||
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
|
||||
import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
|
||||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.MapRecord;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.springframework.lang.Nullable;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class RedisUtils {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(RedisUtils.class);
|
||||
|
||||
// These properties are shared among the controller service(s) and processor(s) that use a RedisConnectionPool
|
||||
|
||||
public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder()
|
||||
|
@ -301,15 +311,44 @@ public class RedisUtils {
|
|||
REDIS_CONNECTION_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
public static RedisConfig createRedisConfig(final PropertyContext context) {
|
||||
final RedisType redisType = RedisType.fromDisplayName(context.getProperty(RedisUtils.REDIS_MODE).getValue());
|
||||
final String connectString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
|
||||
|
||||
public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger, final SSLContext sslContext) {
|
||||
final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
|
||||
final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
|
||||
final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
|
||||
final String password = context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final String sentinelPassword = context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue();
|
||||
final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
|
||||
final RedisConfig redisConfig = new RedisConfig(redisType, connectString);
|
||||
redisConfig.setSentinelMaster(context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue());
|
||||
redisConfig.setDbIndex(context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger());
|
||||
redisConfig.setPassword(context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue());
|
||||
redisConfig.setSentinelPassword(context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue());
|
||||
redisConfig.setTimeout(context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
|
||||
redisConfig.setClusterMaxRedirects(context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger());
|
||||
redisConfig.setPoolMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
|
||||
redisConfig.setPoolMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
|
||||
redisConfig.setPoolMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
|
||||
redisConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
|
||||
redisConfig.setMaxWaitTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS)));
|
||||
redisConfig.setMinEvictableIdleTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS)));
|
||||
redisConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS)));
|
||||
redisConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
|
||||
redisConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
|
||||
redisConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
|
||||
redisConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
|
||||
redisConfig.setTestWhenIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
|
||||
return redisConfig;
|
||||
}
|
||||
|
||||
public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final SSLContext sslContext) {
|
||||
return createConnectionFactory(createRedisConfig(context), sslContext);
|
||||
}
|
||||
|
||||
public static JedisConnectionFactory createConnectionFactory(final RedisConfig redisConfig, final SSLContext sslContext) {
|
||||
final RedisType redisMode = redisConfig.getRedisMode();
|
||||
final String connectionString = redisConfig.getConnectionString();
|
||||
final Integer dbIndex = redisConfig.getDbIndex();
|
||||
final String password = redisConfig.getPassword();
|
||||
final String sentinelPassword = redisConfig.getSentinelPassword();
|
||||
final Integer timeout = redisConfig.getTimeout();
|
||||
final JedisPoolConfig poolConfig = createJedisPoolConfig(redisConfig);
|
||||
|
||||
JedisClientConfiguration.JedisClientConfigurationBuilder builder = JedisClientConfiguration.builder()
|
||||
.connectTimeout(Duration.ofMillis(timeout))
|
||||
|
@ -328,8 +367,8 @@ public class RedisUtils {
|
|||
final JedisClientConfiguration jedisClientConfiguration = builder.build();
|
||||
JedisConnectionFactory connectionFactory;
|
||||
|
||||
if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
|
||||
logger.info("Connecting to Redis in standalone mode at " + connectionString);
|
||||
if (RedisType.STANDALONE == redisMode) {
|
||||
LOGGER.info("Connecting to Redis in standalone mode at " + connectionString);
|
||||
final String[] hostAndPortSplit = connectionString.split("[:]");
|
||||
final String host = hostAndPortSplit[0].trim();
|
||||
final Integer port = Integer.parseInt(hostAndPortSplit[1].trim());
|
||||
|
@ -338,32 +377,32 @@ public class RedisUtils {
|
|||
|
||||
connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration);
|
||||
|
||||
} else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) {
|
||||
} else if (RedisType.SENTINEL == redisMode) {
|
||||
final String[] sentinels = connectionString.split("[,]");
|
||||
final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue();
|
||||
final String sentinelMaster = redisConfig.getSentinelMaster();
|
||||
final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels)));
|
||||
enrichRedisConfiguration(sentinelConfiguration, dbIndex, password, sentinelPassword);
|
||||
|
||||
logger.info("Connecting to Redis in sentinel mode...");
|
||||
logger.info("Redis master = " + sentinelMaster);
|
||||
LOGGER.info("Connecting to Redis in sentinel mode...");
|
||||
LOGGER.info("Redis master = " + sentinelMaster);
|
||||
|
||||
for (final String sentinel : sentinels) {
|
||||
logger.info("Redis sentinel at " + sentinel);
|
||||
LOGGER.info("Redis sentinel at " + sentinel);
|
||||
}
|
||||
|
||||
connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration);
|
||||
|
||||
} else {
|
||||
final String[] clusterNodes = connectionString.split("[,]");
|
||||
final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger();
|
||||
final Integer maxRedirects = redisConfig.getClusterMaxRedirects();
|
||||
|
||||
final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes));
|
||||
enrichRedisConfiguration(clusterConfiguration, dbIndex, password, sentinelPassword);
|
||||
clusterConfiguration.setMaxRedirects(maxRedirects);
|
||||
|
||||
logger.info("Connecting to Redis in clustered mode...");
|
||||
LOGGER.info("Connecting to Redis in clustered mode...");
|
||||
for (final String clusterNode : clusterNodes) {
|
||||
logger.info("Redis cluster node at " + clusterNode);
|
||||
LOGGER.info("Redis cluster node at " + clusterNode);
|
||||
}
|
||||
|
||||
connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration);
|
||||
|
@ -397,20 +436,20 @@ public class RedisUtils {
|
|||
}
|
||||
}
|
||||
|
||||
private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) {
|
||||
private static JedisPoolConfig createJedisPoolConfig(final RedisConfig redisConfig) {
|
||||
final JedisPoolConfig poolConfig = new JedisPoolConfig();
|
||||
poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger());
|
||||
poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger());
|
||||
poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger());
|
||||
poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean());
|
||||
poolConfig.setMaxWait(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asDuration());
|
||||
poolConfig.setMinEvictableIdleTime(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asDuration());
|
||||
poolConfig.setTimeBetweenEvictionRuns(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asDuration());
|
||||
poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger());
|
||||
poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean());
|
||||
poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean());
|
||||
poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean());
|
||||
poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean());
|
||||
poolConfig.setMaxTotal(redisConfig.getPoolMaxTotal());
|
||||
poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle());
|
||||
poolConfig.setMinIdle(redisConfig.getPoolMinIdle());
|
||||
poolConfig.setBlockWhenExhausted(redisConfig.getBlockWhenExhausted());
|
||||
poolConfig.setMaxWait(redisConfig.getMaxWaitTime());
|
||||
poolConfig.setMinEvictableIdleTime(redisConfig.getMinEvictableIdleTime());
|
||||
poolConfig.setTimeBetweenEvictionRuns(redisConfig.getTimeBetweenEvictionRuns());
|
||||
poolConfig.setNumTestsPerEvictionRun(redisConfig.getNumTestsPerEvictionRun());
|
||||
poolConfig.setTestOnCreate(redisConfig.getTestOnCreate());
|
||||
poolConfig.setTestOnBorrow(redisConfig.getTestOnBorrow());
|
||||
poolConfig.setTestOnReturn(redisConfig.getTestOnReturn());
|
||||
poolConfig.setTestWhileIdle(redisConfig.getTestWhenIdle());
|
||||
return poolConfig;
|
||||
}
|
||||
|
||||
|
@ -482,4 +521,39 @@ public class RedisUtils {
|
|||
}
|
||||
}
|
||||
|
||||
public static RedisTemplate<String, String> createRedisTemplateForStreams(final RedisConnectionFactory connectionFactory) {
|
||||
final StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder<String, MapRecord<String, String, byte[]>> builder =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.hashValueSerializer(RedisSerializer.byteArray());
|
||||
|
||||
final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, byte[]>> containerOptions = builder.build();
|
||||
return createRedisTemplateForStreams(connectionFactory, containerOptions);
|
||||
}
|
||||
|
||||
public static RedisTemplate<String, String> createRedisTemplateForStreams(
|
||||
final RedisConnectionFactory connectionFactory,
|
||||
final StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, byte[]>> containerOptions) {
|
||||
final RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
|
||||
redisTemplate.setKeySerializer(containerOptions.getKeySerializer());
|
||||
redisTemplate.setValueSerializer(containerOptions.getKeySerializer());
|
||||
redisTemplate.setHashKeySerializer(containerOptions.getHashKeySerializer());
|
||||
redisTemplate.setHashValueSerializer(containerOptions.getHashValueSerializer());
|
||||
redisTemplate.setConnectionFactory(connectionFactory);
|
||||
redisTemplate.afterPropertiesSet();
|
||||
return redisTemplate;
|
||||
}
|
||||
|
||||
public static RedisTemplate<String, String> createRedisTemplateForKeyValues(final RedisConnectionFactory connectionFactory) {
|
||||
final RedisTemplate<String, String> redisTemplate = new StringRedisTemplate(connectionFactory);
|
||||
redisTemplate.afterPropertiesSet();
|
||||
return redisTemplate;
|
||||
}
|
||||
|
||||
public static RedisTemplate<String, byte[]> createRedisTemplateForPubSub(final RedisConnectionFactory connectionFactory) {
|
||||
final RedisTemplate<String, byte[]> redisTemplate = new RedisTemplate<>();
|
||||
redisTemplate.setConnectionFactory(connectionFactory);
|
||||
redisTemplate.setValueSerializer(RedisSerializer.byteArray());
|
||||
redisTemplate.afterPropertiesSet();
|
||||
return redisTemplate;
|
||||
}
|
||||
}
|
|
@ -31,6 +31,7 @@
|
|||
</properties>
|
||||
|
||||
<modules>
|
||||
<module>nifi-redis-utils</module>
|
||||
<module>nifi-redis-service-api</module>
|
||||
<module>nifi-redis-service-api-nar</module>
|
||||
<module>nifi-redis-extensions</module>
|
||||
|
|
Loading…
Reference in New Issue