diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml index dcd7ee6c81..85fb6c6dcf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/state-management.xml @@ -95,6 +95,10 @@ Password - The password used to authenticate to the Redis server. See the requirepass property in redis.conf. + Enable TLS - If true, the Redis connection will be configured to use TLS, using the keystore and truststore settings configured in + nifi.properties. This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode. + If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure (default false). + Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout). A negative value indicates that there is no limit. @@ -131,4 +135,4 @@ --> - \ No newline at end of file + diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java index 68169f9c39..83937fb5d9 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/service/RedisConnectionPoolService.java @@ -29,9 +29,11 @@ import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.RedisType; import org.apache.nifi.redis.util.RedisUtils; +import org.apache.nifi.ssl.SSLContextService; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import javax.net.ssl.SSLContext; import java.util.Collection; import java.util.List; @@ -42,6 +44,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem private volatile PropertyContext context; private volatile RedisType redisType; private volatile JedisConnectionFactory connectionFactory; + private volatile SSLContext sslContext; @Override protected List getSupportedPropertyDescriptors() { @@ -56,6 +59,10 @@ public class RedisConnectionPoolService extends AbstractControllerService implem @OnEnabled public void onEnabled(final ConfigurationContext context) { this.context = context; + if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) { + final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + this.sslContext = sslContextService.createContext(); + } final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue(); this.redisType = RedisType.fromDisplayName(redisMode); @@ -68,6 +75,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem connectionFactory = null; redisType = null; context = null; + sslContext = null; } } @@ -81,7 +89,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem if (connectionFactory == null) { synchronized (this) { if (connectionFactory == null) { - connectionFactory = RedisUtils.createConnectionFactory(context, getLogger()); + connectionFactory = RedisUtils.createConnectionFactory(context, getLogger(), sslContext); } } } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java index c23bb81f14..6d552dc771 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/state/RedisStateProvider.java @@ -33,6 +33,7 @@ import org.apache.nifi.redis.util.RedisUtils; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import javax.net.ssl.SSLContext; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -57,11 +58,22 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements .defaultValue("nifi/components/") .addValidator(StandardValidators.NON_BLANK_VALIDATOR) .build(); + public static final PropertyDescriptor ENABLE_TLS = new PropertyDescriptor.Builder() + .name("Enable TLS") + .displayName("Enable TLS") + .description("If true, the Redis connection will be configured to use TLS, using the keystore and truststore settings configured in " + + "nifi.properties. This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode. " + + "If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure.") + .required(true) + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); static final List STATE_PROVIDER_PROPERTIES; static { final List props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS); props.add(KEY_PREFIX); + props.add(ENABLE_TLS); STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props); } @@ -69,6 +81,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements private String keyPrefix; private ComponentLog logger; private PropertyContext context; + private SSLContext sslContext; private volatile boolean enabled; private volatile JedisConnectionFactory connectionFactory; @@ -76,8 +89,11 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe(); @Override - public final void initialize(final StateProviderInitializationContext context) throws IOException { + public final void initialize(final StateProviderInitializationContext context) { this.context = context; + if (context.getProperty(ENABLE_TLS).asBoolean()) { + this.sslContext = context.getSSLContext(); + } this.identifier = context.getIdentifier(); this.logger = context.getLogger(); @@ -98,7 +114,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements final List results = new ArrayList<>(RedisUtils.validate(validationContext)); final RedisType redisType = RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue()); - if (redisType != null && redisType == RedisType.CLUSTER) { + if (redisType == RedisType.CLUSTER) { results.add(new ValidationResult.Builder() .subject(RedisUtils.REDIS_MODE.getDisplayName()) .valid(false) @@ -106,6 +122,16 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements + " is configured in clustered mode, and this service requires a non-clustered Redis") .build()); } + final boolean enableTls = validationContext.getProperty(ENABLE_TLS).asBoolean(); + if (enableTls && sslContext == null) { + results.add(new ValidationResult.Builder() + .subject(ENABLE_TLS.getDisplayName()) + .valid(false) + .explanation(ENABLE_TLS.getDisplayName() + + " is set to 'true', but Apache NiFi is not secured. This state provider can only use a TLS-enabled connection " + + "if a keystore and truststore are provided in nifi.properties.") + .build()); + } return results; } @@ -274,7 +300,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements // visible for testing synchronized RedisConnection getRedis() { if (connectionFactory == null) { - connectionFactory = RedisUtils.createConnectionFactory(context, logger); + connectionFactory = RedisUtils.createConnectionFactory(context, logger, sslContext); } return connectionFactory.getConnection(); diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java index 1b5bab85e9..dd0197726a 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/main/java/org/apache/nifi/redis/util/RedisUtils.java @@ -26,7 +26,6 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.redis.RedisType; import org.apache.nifi.ssl.RestrictedSSLContextService; -import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.StringUtils; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConfiguration; @@ -273,7 +272,7 @@ public class RedisUtils { } - public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger) { + public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger, final SSLContext sslContext) { final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue(); final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue(); final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger(); @@ -288,9 +287,7 @@ public class RedisUtils { .poolConfig(poolConfig) .and(); - if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) { - final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SSLContext sslContext = sslContextService.createContext(); + if (sslContext != null) { builder = builder.useSsl() .sslParameters(sslContext.getSupportedSSLParameters()) .sslSocketFactory(sslContext.getSocketFactory()) diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java index 7223e65628..40d4c42132 100644 --- a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/service/TestRedisConnectionPoolService.java @@ -20,6 +20,7 @@ import org.apache.nifi.redis.RedisConnectionPool; import org.apache.nifi.redis.util.RedisUtils; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.ssl.RestrictedSSLContextService; +import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockConfigurationContext; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.StandardProcessorTestRunner; @@ -118,7 +119,12 @@ public class TestRedisConnectionPoolService { MockProcessContext processContext = ((StandardProcessorTestRunner) testRunner).getProcessContext(); MockConfigurationContext configContext = new MockConfigurationContext(processContext.getControllerServices() .get(redisService.getIdentifier()).getProperties(), processContext); - JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger()); + SSLContext providedSslContext = null; + if (configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) { + final SSLContextService sslContextService = configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + providedSslContext = sslContextService.createContext(); + } + JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), providedSslContext); return connectionFactory; } diff --git a/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java new file mode 100644 index 0000000000..45fd899768 --- /dev/null +++ b/nifi-nar-bundles/nifi-redis-bundle/nifi-redis-extensions/src/test/java/org/apache/nifi/redis/state/TestRedisStateProvider.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.redis.state; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.state.StateProviderInitializationContext; +import org.apache.nifi.redis.util.RedisUtils; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.util.Collection; + +public class TestRedisStateProvider { + + private RedisStateProvider redisStateProvider; + private StateProviderInitializationContext context; + private ValidationContext validationContext; + + @Before + public void init() { + context = Mockito.mock(StateProviderInitializationContext.class); + redisStateProvider = new RedisStateProvider(); + validationContext = Mockito.mock(ValidationContext.class); + + // Set up mock state provider init context + Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new MockPropertyValue("/nifi/components/")); + + // Set up mock validation context + Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new MockPropertyValue("localhost:6379")); + Mockito.when(validationContext.getProperty(RedisUtils.REDIS_MODE)).thenReturn(new MockPropertyValue("Standalone")); + Mockito.when(validationContext.getProperty(RedisUtils.DATABASE)).thenReturn(new MockPropertyValue("0")); + } + + private void enableTls(boolean enable) { + Mockito.when(validationContext.getProperty(RedisStateProvider.ENABLE_TLS)).thenReturn(new MockPropertyValue(String.valueOf(enable))); + Mockito.when(context.getProperty(RedisStateProvider.ENABLE_TLS)).thenReturn(new MockPropertyValue(String.valueOf(enable))); + + if (enable) { + SSLContext sslContext = Mockito.mock(SSLContext.class); + Mockito.when(context.getSSLContext()).thenReturn(sslContext); + } + } + + @Test + public void customValidate_enabledTlsSuccess() throws IOException { + this.enableTls(true); + + redisStateProvider.initialize(context); + + Collection results = redisStateProvider.customValidate(validationContext); + Assert.assertTrue(results.isEmpty()); + } + + @Test + public void customValidate_disableTlsSuccess() throws IOException { + this.enableTls(false); + + redisStateProvider.initialize(context); + + Collection results = redisStateProvider.customValidate(validationContext); + Assert.assertTrue(results.isEmpty()); + } + + @Test + public void customValidate_enableTlsButNoSslContext() throws IOException { + this.enableTls(true); + + Mockito.when(context.getSSLContext()).thenReturn(null); + + redisStateProvider.initialize(context); + + Collection results = redisStateProvider.customValidate(validationContext); + Assert.assertEquals(1, results.size()); + } +}