diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml index 82a3b96607..bf1a5f2793 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/pom.xml @@ -38,6 +38,11 @@ 2.0.0-SNAPSHOT provided + + org.apache.nifi + nifi-redis-utils + 2.0.0-SNAPSHOT + org.springframework.data spring-data-redis diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java index 70e5ca7f5f..abaa47c19f 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java @@ -90,7 +90,7 @@ RedisConnectionPoolService extends AbstractControllerService implements RedisCon if (connectionFactory == null) { synchronized (this) { if (connectionFactory == null) { - connectionFactory = RedisUtils.createConnectionFactory(context, getLogger(), sslContext); + connectionFactory = RedisUtils.createConnectionFactory(context, sslContext); } } } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java index b684a77d5d..4792523eef 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java @@ -344,7 +344,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements // visible for testing synchronized RedisConnection getRedis() { if (connectionFactory == null) { - connectionFactory = RedisUtils.createConnectionFactory(context, logger, sslContext); + connectionFactory = RedisUtils.createConnectionFactory(context, sslContext); } return connectionFactory.getConnection(); diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java index 5ec27d1996..c77e12e4fa 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java @@ -126,7 +126,7 @@ public class TestRedisConnectionPoolService { final SSLContextService sslContextService = configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); providedSslContext = sslContextService.createContext(); } - JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), providedSslContext); + JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, providedSslContext); return connectionFactory; } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml new file mode 100644 index 0000000000..cd3e1ee2f8 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/pom.xml @@ -0,0 +1,67 @@ + + + + 4.0.0 + + + org.apache.nifi + nifi-redis-bundle + 2.0.0-SNAPSHOT + + + nifi-redis-utils + jar + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-redis-service-api + 2.0.0-SNAPSHOT + provided + + + org.apache.nifi + nifi-ssl-context-service-api + provided + + + org.springframework.data + spring-data-redis + ${spring.data.redis.version} + provided + + + redis.clients + jedis + ${jedis.version} + provided + + + org.apache.nifi + nifi-utils + 2.0.0-SNAPSHOT + + + org.testcontainers + junit-jupiter + test + + + diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisAction.java similarity index 100% rename from nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisAction.java rename to nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisAction.java diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java new file mode 100644 index 0000000000..c5da310093 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisConfig.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.util; + +import org.apache.nifi.redis.RedisType; + +import java.time.Duration; +import java.util.Objects; + +public class RedisConfig { + + private final RedisType redisMode; + private final String connectionString; + + private String sentinelMaster; + private String sentinelPassword; + private String password; + + private int dbIndex = 0; + private int timeout = 10000; + private int clusterMaxRedirects = 5; + + private int poolMaxTotal = 8; + private int poolMaxIdle = 8; + private int poolMinIdle = 0; + private boolean blockWhenExhausted = true; + private Duration maxWaitTime = Duration.ofSeconds(10); + private Duration minEvictableIdleTime = Duration.ofSeconds(60); + private Duration timeBetweenEvictionRuns = Duration.ofSeconds(30); + private int numTestsPerEvictionRun = -1; + private boolean testOnCreate = true; + private boolean testOnBorrow = true; + private boolean testOnReturn = false; + private boolean testWhenIdle = true; + + public RedisConfig(final RedisType redisMode, final String connectionString) { + this.redisMode = Objects.requireNonNull(redisMode); + this.connectionString = Objects.requireNonNull(connectionString); + } + + public RedisType getRedisMode() { + return redisMode; + } + + public String getConnectionString() { + return connectionString; + } + + public String getSentinelMaster() { + return sentinelMaster; + } + + public void setSentinelMaster(String sentinelMaster) { + this.sentinelMaster = sentinelMaster; + } + + public String getSentinelPassword() { + return sentinelPassword; + } + + public void setSentinelPassword(String sentinelPassword) { + this.sentinelPassword = sentinelPassword; + } + + public int getDbIndex() { + return dbIndex; + } + + public void setDbIndex(int dbIndex) { + this.dbIndex = dbIndex; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public int getClusterMaxRedirects() { + return clusterMaxRedirects; + } + + public void setClusterMaxRedirects(int clusterMaxRedirects) { + this.clusterMaxRedirects = clusterMaxRedirects; + } + + public int getPoolMaxTotal() { + return poolMaxTotal; + } + + public void setPoolMaxTotal(int poolMaxTotal) { + this.poolMaxTotal = poolMaxTotal; + } + + public int getPoolMaxIdle() { + return poolMaxIdle; + } + + public void setPoolMaxIdle(int poolMaxIdle) { + this.poolMaxIdle = poolMaxIdle; + } + + public int getPoolMinIdle() { + return poolMinIdle; + } + + public void setPoolMinIdle(int poolMinIdle) { + this.poolMinIdle = poolMinIdle; + } + + public boolean getBlockWhenExhausted() { + return blockWhenExhausted; + } + + public void setBlockWhenExhausted(boolean blockWhenExhausted) { + this.blockWhenExhausted = blockWhenExhausted; + } + + public Duration getMaxWaitTime() { + return maxWaitTime; + } + + public void setMaxWaitTime(Duration maxWaitTime) { + this.maxWaitTime = maxWaitTime; + } + + public Duration getMinEvictableIdleTime() { + return minEvictableIdleTime; + } + + public void setMinEvictableIdleTime(Duration minEvictableIdleTime) { + this.minEvictableIdleTime = minEvictableIdleTime; + } + + public Duration getTimeBetweenEvictionRuns() { + return timeBetweenEvictionRuns; + } + + public void setTimeBetweenEvictionRuns(Duration timeBetweenEvictionRuns) { + this.timeBetweenEvictionRuns = timeBetweenEvictionRuns; + } + + public int getNumTestsPerEvictionRun() { + return numTestsPerEvictionRun; + } + + public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) { + this.numTestsPerEvictionRun = numTestsPerEvictionRun; + } + + public boolean getTestOnCreate() { + return testOnCreate; + } + + public void setTestOnCreate(boolean testOnCreate) { + this.testOnCreate = testOnCreate; + } + + public boolean getTestOnBorrow() { + return testOnBorrow; + } + + public void setTestOnBorrow(boolean testOnBorrow) { + this.testOnBorrow = testOnBorrow; + } + + public boolean getTestOnReturn() { + return testOnReturn; + } + + public void setTestOnReturn(boolean testOnReturn) { + this.testOnReturn = testOnReturn; + } + + public boolean getTestWhenIdle() { + return testWhenIdle; + } + + public void setTestWhenIdle(boolean testWhenIdle) { + this.testWhenIdle = testWhenIdle; + } +} diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java similarity index 76% rename from nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java rename to nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java index f72ba136f5..51b17e3cd0 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-utils/src/main/java/org/apache/nifi/redis/util/RedisUtils.java @@ -16,37 +16,47 @@ */ package org.apache.nifi.redis.util; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.TimeUnit; -import javax.net.ssl.SSLContext; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisType; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConfiguration; +import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisPassword; import org.springframework.data.redis.connection.RedisSentinelConfiguration; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.jedis.JedisClientConfiguration; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.lang.Nullable; import redis.clients.jedis.JedisPoolConfig; +import javax.net.ssl.SSLContext; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.TimeUnit; + public class RedisUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(RedisUtils.class); + // These properties are shared among the controller service(s) and processor(s) that use a RedisConnectionPool public static final PropertyDescriptor REDIS_CONNECTION_POOL = new PropertyDescriptor.Builder() @@ -301,15 +311,44 @@ public class RedisUtils { REDIS_CONNECTION_PROPERTY_DESCRIPTORS = Collections.unmodifiableList(props); } + public static RedisConfig createRedisConfig(final PropertyContext context) { + final RedisType redisType = RedisType.fromDisplayName(context.getProperty(RedisUtils.REDIS_MODE).getValue()); + final String connectString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); - public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger, final SSLContext sslContext) { - final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue(); - final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); - final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger(); - final String password = context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue(); - final String sentinelPassword = context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue(); - final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - final JedisPoolConfig poolConfig = createJedisPoolConfig(context); + final RedisConfig redisConfig = new RedisConfig(redisType, connectString); + redisConfig.setSentinelMaster(context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue()); + redisConfig.setDbIndex(context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger()); + redisConfig.setPassword(context.getProperty(RedisUtils.PASSWORD).evaluateAttributeExpressions().getValue()); + redisConfig.setSentinelPassword(context.getProperty(RedisUtils.SENTINEL_PASSWORD).evaluateAttributeExpressions().getValue()); + redisConfig.setTimeout(context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue()); + redisConfig.setClusterMaxRedirects(context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger()); + redisConfig.setPoolMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger()); + redisConfig.setPoolMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger()); + redisConfig.setPoolMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger()); + redisConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean()); + redisConfig.setMaxWaitTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS))); + redisConfig.setMinEvictableIdleTime(Duration.ofMillis(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asTimePeriod(TimeUnit.MILLISECONDS))); + redisConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asTimePeriod(TimeUnit.MILLISECONDS))); + redisConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger()); + redisConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean()); + redisConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean()); + redisConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean()); + redisConfig.setTestWhenIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean()); + return redisConfig; + } + + public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final SSLContext sslContext) { + return createConnectionFactory(createRedisConfig(context), sslContext); + } + + public static JedisConnectionFactory createConnectionFactory(final RedisConfig redisConfig, final SSLContext sslContext) { + final RedisType redisMode = redisConfig.getRedisMode(); + final String connectionString = redisConfig.getConnectionString(); + final Integer dbIndex = redisConfig.getDbIndex(); + final String password = redisConfig.getPassword(); + final String sentinelPassword = redisConfig.getSentinelPassword(); + final Integer timeout = redisConfig.getTimeout(); + final JedisPoolConfig poolConfig = createJedisPoolConfig(redisConfig); JedisClientConfiguration.JedisClientConfigurationBuilder builder = JedisClientConfiguration.builder() .connectTimeout(Duration.ofMillis(timeout)) @@ -328,8 +367,8 @@ public class RedisUtils { final JedisClientConfiguration jedisClientConfiguration = builder.build(); JedisConnectionFactory connectionFactory; - if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) { - logger.info("Connecting to Redis in standalone mode at " + connectionString); + if (RedisType.STANDALONE == redisMode) { + LOGGER.info("Connecting to Redis in standalone mode at " + connectionString); final String[] hostAndPortSplit = connectionString.split("[:]"); final String host = hostAndPortSplit[0].trim(); final Integer port = Integer.parseInt(hostAndPortSplit[1].trim()); @@ -338,32 +377,32 @@ public class RedisUtils { connectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration); - } else if (RedisUtils.REDIS_MODE_SENTINEL.getValue().equals(redisMode)) { + } else if (RedisType.SENTINEL == redisMode) { final String[] sentinels = connectionString.split("[,]"); - final String sentinelMaster = context.getProperty(RedisUtils.SENTINEL_MASTER).evaluateAttributeExpressions().getValue(); + final String sentinelMaster = redisConfig.getSentinelMaster(); final RedisSentinelConfiguration sentinelConfiguration = new RedisSentinelConfiguration(sentinelMaster, new HashSet<>(getTrimmedValues(sentinels))); enrichRedisConfiguration(sentinelConfiguration, dbIndex, password, sentinelPassword); - logger.info("Connecting to Redis in sentinel mode..."); - logger.info("Redis master = " + sentinelMaster); + LOGGER.info("Connecting to Redis in sentinel mode..."); + LOGGER.info("Redis master = " + sentinelMaster); for (final String sentinel : sentinels) { - logger.info("Redis sentinel at " + sentinel); + LOGGER.info("Redis sentinel at " + sentinel); } connectionFactory = new JedisConnectionFactory(sentinelConfiguration, jedisClientConfiguration); } else { final String[] clusterNodes = connectionString.split("[,]"); - final Integer maxRedirects = context.getProperty(RedisUtils.CLUSTER_MAX_REDIRECTS).asInteger(); + final Integer maxRedirects = redisConfig.getClusterMaxRedirects(); final RedisClusterConfiguration clusterConfiguration = new RedisClusterConfiguration(getTrimmedValues(clusterNodes)); enrichRedisConfiguration(clusterConfiguration, dbIndex, password, sentinelPassword); clusterConfiguration.setMaxRedirects(maxRedirects); - logger.info("Connecting to Redis in clustered mode..."); + LOGGER.info("Connecting to Redis in clustered mode..."); for (final String clusterNode : clusterNodes) { - logger.info("Redis cluster node at " + clusterNode); + LOGGER.info("Redis cluster node at " + clusterNode); } connectionFactory = new JedisConnectionFactory(clusterConfiguration, jedisClientConfiguration); @@ -397,20 +436,20 @@ public class RedisUtils { } } - private static JedisPoolConfig createJedisPoolConfig(final PropertyContext context) { + private static JedisPoolConfig createJedisPoolConfig(final RedisConfig redisConfig) { final JedisPoolConfig poolConfig = new JedisPoolConfig(); - poolConfig.setMaxTotal(context.getProperty(RedisUtils.POOL_MAX_TOTAL).asInteger()); - poolConfig.setMaxIdle(context.getProperty(RedisUtils.POOL_MAX_IDLE).asInteger()); - poolConfig.setMinIdle(context.getProperty(RedisUtils.POOL_MIN_IDLE).asInteger()); - poolConfig.setBlockWhenExhausted(context.getProperty(RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED).asBoolean()); - poolConfig.setMaxWait(context.getProperty(RedisUtils.POOL_MAX_WAIT_TIME).asDuration()); - poolConfig.setMinEvictableIdleTime(context.getProperty(RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME).asDuration()); - poolConfig.setTimeBetweenEvictionRuns(context.getProperty(RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS).asDuration()); - poolConfig.setNumTestsPerEvictionRun(context.getProperty(RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN).asInteger()); - poolConfig.setTestOnCreate(context.getProperty(RedisUtils.POOL_TEST_ON_CREATE).asBoolean()); - poolConfig.setTestOnBorrow(context.getProperty(RedisUtils.POOL_TEST_ON_BORROW).asBoolean()); - poolConfig.setTestOnReturn(context.getProperty(RedisUtils.POOL_TEST_ON_RETURN).asBoolean()); - poolConfig.setTestWhileIdle(context.getProperty(RedisUtils.POOL_TEST_WHILE_IDLE).asBoolean()); + poolConfig.setMaxTotal(redisConfig.getPoolMaxTotal()); + poolConfig.setMaxIdle(redisConfig.getPoolMaxIdle()); + poolConfig.setMinIdle(redisConfig.getPoolMinIdle()); + poolConfig.setBlockWhenExhausted(redisConfig.getBlockWhenExhausted()); + poolConfig.setMaxWait(redisConfig.getMaxWaitTime()); + poolConfig.setMinEvictableIdleTime(redisConfig.getMinEvictableIdleTime()); + poolConfig.setTimeBetweenEvictionRuns(redisConfig.getTimeBetweenEvictionRuns()); + poolConfig.setNumTestsPerEvictionRun(redisConfig.getNumTestsPerEvictionRun()); + poolConfig.setTestOnCreate(redisConfig.getTestOnCreate()); + poolConfig.setTestOnBorrow(redisConfig.getTestOnBorrow()); + poolConfig.setTestOnReturn(redisConfig.getTestOnReturn()); + poolConfig.setTestWhileIdle(redisConfig.getTestWhenIdle()); return poolConfig; } @@ -482,4 +521,39 @@ public class RedisUtils { } } + public static RedisTemplate createRedisTemplateForStreams(final RedisConnectionFactory connectionFactory) { + final StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder> builder = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder() + .hashValueSerializer(RedisSerializer.byteArray()); + + final StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions = builder.build(); + return createRedisTemplateForStreams(connectionFactory, containerOptions); + } + + public static RedisTemplate createRedisTemplateForStreams( + final RedisConnectionFactory connectionFactory, + final StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions) { + final RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setKeySerializer(containerOptions.getKeySerializer()); + redisTemplate.setValueSerializer(containerOptions.getKeySerializer()); + redisTemplate.setHashKeySerializer(containerOptions.getHashKeySerializer()); + redisTemplate.setHashValueSerializer(containerOptions.getHashValueSerializer()); + redisTemplate.setConnectionFactory(connectionFactory); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } + + public static RedisTemplate createRedisTemplateForKeyValues(final RedisConnectionFactory connectionFactory) { + final RedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } + + public static RedisTemplate createRedisTemplateForPubSub(final RedisConnectionFactory connectionFactory) { + final RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(connectionFactory); + redisTemplate.setValueSerializer(RedisSerializer.byteArray()); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } } diff --git a/nifi-nar-bundles/nifi-redis-bundle/pom.xml b/nifi-nar-bundles/nifi-redis-bundle/pom.xml index 06e9c45e47..62d3c833f1 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-redis-bundle/pom.xml @@ -31,6 +31,7 @@ + nifi-redis-utils nifi-redis-service-api nifi-redis-service-api-nar nifi-redis-extensions