mirror of https://github.com/apache/nifi.git
NIFI-8410: Enabling TLS in RedisStateProvider
This closes #4990 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
7d1d536da6
commit
2298953f90
|
@ -95,6 +95,10 @@
|
|||
|
||||
Password - The password used to authenticate to the Redis server. See the requirepass property in redis.conf.
|
||||
|
||||
Enable TLS - If true, the Redis connection will be configured to use TLS, using the keystore and truststore settings configured in
|
||||
nifi.properties. This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode.
|
||||
If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure (default false).
|
||||
|
||||
Pool - Max Total - The maximum number of connections that can be allocated by the pool (checked out to clients, or idle awaiting checkout).
|
||||
A negative value indicates that there is no limit.
|
||||
|
||||
|
@ -131,4 +135,4 @@
|
|||
</cluster-provider>
|
||||
-->
|
||||
|
||||
</stateManagement>
|
||||
</stateManagement>
|
||||
|
|
|
@ -29,9 +29,11 @@ import org.apache.nifi.controller.ConfigurationContext;
|
|||
import org.apache.nifi.redis.RedisConnectionPool;
|
||||
import org.apache.nifi.redis.RedisType;
|
||||
import org.apache.nifi.redis.util.RedisUtils;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -42,6 +44,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
|
|||
private volatile PropertyContext context;
|
||||
private volatile RedisType redisType;
|
||||
private volatile JedisConnectionFactory connectionFactory;
|
||||
private volatile SSLContext sslContext;
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
|
@ -56,6 +59,10 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
|
|||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
this.context = context;
|
||||
if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
|
||||
final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
this.sslContext = sslContextService.createContext();
|
||||
}
|
||||
|
||||
final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
|
||||
this.redisType = RedisType.fromDisplayName(redisMode);
|
||||
|
@ -68,6 +75,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
|
|||
connectionFactory = null;
|
||||
redisType = null;
|
||||
context = null;
|
||||
sslContext = null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,7 +89,7 @@ public class RedisConnectionPoolService extends AbstractControllerService implem
|
|||
if (connectionFactory == null) {
|
||||
synchronized (this) {
|
||||
if (connectionFactory == null) {
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, getLogger());
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, getLogger(), sslContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.nifi.redis.util.RedisUtils;
|
|||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
|
@ -57,11 +58,22 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
.defaultValue("nifi/components/")
|
||||
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||
.build();
|
||||
public static final PropertyDescriptor ENABLE_TLS = new PropertyDescriptor.Builder()
|
||||
.name("Enable TLS")
|
||||
.displayName("Enable TLS")
|
||||
.description("If true, the Redis connection will be configured to use TLS, using the keystore and truststore settings configured in " +
|
||||
"nifi.properties. This means that a TLS-enabled Redis connection is only possible if the Apache NiFi instance is running in secure mode. " +
|
||||
"If this property is false, an insecure Redis connection will be used even if the Apache NiFi instance is secure.")
|
||||
.required(true)
|
||||
.defaultValue("false")
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
static final List<PropertyDescriptor> STATE_PROVIDER_PROPERTIES;
|
||||
static {
|
||||
final List<PropertyDescriptor> props = new ArrayList<>(RedisUtils.REDIS_CONNECTION_PROPERTY_DESCRIPTORS);
|
||||
props.add(KEY_PREFIX);
|
||||
props.add(ENABLE_TLS);
|
||||
STATE_PROVIDER_PROPERTIES = Collections.unmodifiableList(props);
|
||||
}
|
||||
|
||||
|
@ -69,6 +81,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
private String keyPrefix;
|
||||
private ComponentLog logger;
|
||||
private PropertyContext context;
|
||||
private SSLContext sslContext;
|
||||
|
||||
private volatile boolean enabled;
|
||||
private volatile JedisConnectionFactory connectionFactory;
|
||||
|
@ -76,8 +89,11 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
private final RedisStateMapSerDe serDe = new RedisStateMapJsonSerDe();
|
||||
|
||||
@Override
|
||||
public final void initialize(final StateProviderInitializationContext context) throws IOException {
|
||||
public final void initialize(final StateProviderInitializationContext context) {
|
||||
this.context = context;
|
||||
if (context.getProperty(ENABLE_TLS).asBoolean()) {
|
||||
this.sslContext = context.getSSLContext();
|
||||
}
|
||||
this.identifier = context.getIdentifier();
|
||||
this.logger = context.getLogger();
|
||||
|
||||
|
@ -98,7 +114,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
final List<ValidationResult> results = new ArrayList<>(RedisUtils.validate(validationContext));
|
||||
|
||||
final RedisType redisType = RedisType.fromDisplayName(validationContext.getProperty(RedisUtils.REDIS_MODE).getValue());
|
||||
if (redisType != null && redisType == RedisType.CLUSTER) {
|
||||
if (redisType == RedisType.CLUSTER) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(RedisUtils.REDIS_MODE.getDisplayName())
|
||||
.valid(false)
|
||||
|
@ -106,6 +122,16 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
+ " is configured in clustered mode, and this service requires a non-clustered Redis")
|
||||
.build());
|
||||
}
|
||||
final boolean enableTls = validationContext.getProperty(ENABLE_TLS).asBoolean();
|
||||
if (enableTls && sslContext == null) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(ENABLE_TLS.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(ENABLE_TLS.getDisplayName()
|
||||
+ " is set to 'true', but Apache NiFi is not secured. This state provider can only use a TLS-enabled connection " +
|
||||
"if a keystore and truststore are provided in nifi.properties.")
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
@ -274,7 +300,7 @@ public class RedisStateProvider extends AbstractConfigurableComponent implements
|
|||
// visible for testing
|
||||
synchronized RedisConnection getRedis() {
|
||||
if (connectionFactory == null) {
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, logger);
|
||||
connectionFactory = RedisUtils.createConnectionFactory(context, logger, sslContext);
|
||||
}
|
||||
|
||||
return connectionFactory.getConnection();
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.nifi.logging.ComponentLog;
|
|||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.redis.RedisType;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.springframework.data.redis.connection.RedisClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration;
|
||||
|
@ -273,7 +272,7 @@ public class RedisUtils {
|
|||
}
|
||||
|
||||
|
||||
public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger) {
|
||||
public static JedisConnectionFactory createConnectionFactory(final PropertyContext context, final ComponentLog logger, final SSLContext sslContext) {
|
||||
final String redisMode = context.getProperty(RedisUtils.REDIS_MODE).getValue();
|
||||
final String connectionString = context.getProperty(RedisUtils.CONNECTION_STRING).evaluateAttributeExpressions().getValue();
|
||||
final Integer dbIndex = context.getProperty(RedisUtils.DATABASE).evaluateAttributeExpressions().asInteger();
|
||||
|
@ -288,9 +287,7 @@ public class RedisUtils {
|
|||
.poolConfig(poolConfig)
|
||||
.and();
|
||||
|
||||
if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
|
||||
final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final SSLContext sslContext = sslContextService.createContext();
|
||||
if (sslContext != null) {
|
||||
builder = builder.useSsl()
|
||||
.sslParameters(sslContext.getSupportedSSLParameters())
|
||||
.sslSocketFactory(sslContext.getSocketFactory())
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.nifi.redis.RedisConnectionPool;
|
|||
import org.apache.nifi.redis.util.RedisUtils;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.StandardProcessorTestRunner;
|
||||
|
@ -118,7 +119,12 @@ public class TestRedisConnectionPoolService {
|
|||
MockProcessContext processContext = ((StandardProcessorTestRunner) testRunner).getProcessContext();
|
||||
MockConfigurationContext configContext = new MockConfigurationContext(processContext.getControllerServices()
|
||||
.get(redisService.getIdentifier()).getProperties(), processContext);
|
||||
JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger());
|
||||
SSLContext providedSslContext = null;
|
||||
if (configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
|
||||
final SSLContextService sslContextService = configContext.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
providedSslContext = sslContextService.createContext();
|
||||
}
|
||||
JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger(), providedSslContext);
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.redis.state;
|
||||
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.components.state.StateProviderInitializationContext;
|
||||
import org.apache.nifi.redis.util.RedisUtils;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
public class TestRedisStateProvider {
|
||||
|
||||
private RedisStateProvider redisStateProvider;
|
||||
private StateProviderInitializationContext context;
|
||||
private ValidationContext validationContext;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
context = Mockito.mock(StateProviderInitializationContext.class);
|
||||
redisStateProvider = new RedisStateProvider();
|
||||
validationContext = Mockito.mock(ValidationContext.class);
|
||||
|
||||
// Set up mock state provider init context
|
||||
Mockito.when(context.getProperty(RedisStateProvider.KEY_PREFIX)).thenReturn(new MockPropertyValue("/nifi/components/"));
|
||||
|
||||
// Set up mock validation context
|
||||
Mockito.when(validationContext.getProperty(RedisUtils.CONNECTION_STRING)).thenReturn(new MockPropertyValue("localhost:6379"));
|
||||
Mockito.when(validationContext.getProperty(RedisUtils.REDIS_MODE)).thenReturn(new MockPropertyValue("Standalone"));
|
||||
Mockito.when(validationContext.getProperty(RedisUtils.DATABASE)).thenReturn(new MockPropertyValue("0"));
|
||||
}
|
||||
|
||||
private void enableTls(boolean enable) {
|
||||
Mockito.when(validationContext.getProperty(RedisStateProvider.ENABLE_TLS)).thenReturn(new MockPropertyValue(String.valueOf(enable)));
|
||||
Mockito.when(context.getProperty(RedisStateProvider.ENABLE_TLS)).thenReturn(new MockPropertyValue(String.valueOf(enable)));
|
||||
|
||||
if (enable) {
|
||||
SSLContext sslContext = Mockito.mock(SSLContext.class);
|
||||
Mockito.when(context.getSSLContext()).thenReturn(sslContext);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customValidate_enabledTlsSuccess() throws IOException {
|
||||
this.enableTls(true);
|
||||
|
||||
redisStateProvider.initialize(context);
|
||||
|
||||
Collection<ValidationResult> results = redisStateProvider.customValidate(validationContext);
|
||||
Assert.assertTrue(results.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customValidate_disableTlsSuccess() throws IOException {
|
||||
this.enableTls(false);
|
||||
|
||||
redisStateProvider.initialize(context);
|
||||
|
||||
Collection<ValidationResult> results = redisStateProvider.customValidate(validationContext);
|
||||
Assert.assertTrue(results.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void customValidate_enableTlsButNoSslContext() throws IOException {
|
||||
this.enableTls(true);
|
||||
|
||||
Mockito.when(context.getSSLContext()).thenReturn(null);
|
||||
|
||||
redisStateProvider.initialize(context);
|
||||
|
||||
Collection<ValidationResult> results = redisStateProvider.customValidate(validationContext);
|
||||
Assert.assertEquals(1, results.size());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue