mirror of https://github.com/apache/nifi.git
NIFI-7198: Adding optional SSL Context Service to RedisConnectionPoolService
This closes #4981 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
bfd964b9c7
commit
76f33e42c6
|
@ -78,5 +78,10 @@
|
|||
<version>0.6</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
|
|||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.redis.RedisType;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
import org.apache.nifi.util.StringUtils;
|
||||
import org.springframework.data.redis.connection.RedisClusterConfiguration;
|
||||
import org.springframework.data.redis.connection.RedisConfiguration;
|
||||
|
@ -35,6 +37,7 @@ import org.springframework.data.redis.connection.jedis.JedisClientConfiguration;
|
|||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||
import redis.clients.jedis.JedisPoolConfig;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
|
@ -234,6 +237,15 @@ public class RedisUtils {
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.displayName("SSL Context Service")
|
||||
.description("If specified, this service will be used to create an SSL Context that will be used "
|
||||
+ "to secure communications; if not specified, communications will not be secure")
|
||||
.required(false)
|
||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> REDIS_CONNECTION_PROPERTY_DESCRIPTORS;
|
||||
static {
|
||||
final List<PropertyDescriptor> props = new ArrayList<>();
|
||||
|
@ -244,6 +256,7 @@ public class RedisUtils {
|
|||
props.add(RedisUtils.CLUSTER_MAX_REDIRECTS);
|
||||
props.add(RedisUtils.SENTINEL_MASTER);
|
||||
props.add(RedisUtils.PASSWORD);
|
||||
props.add(RedisUtils.SSL_CONTEXT_SERVICE);
|
||||
props.add(RedisUtils.POOL_MAX_TOTAL);
|
||||
props.add(RedisUtils.POOL_MAX_IDLE);
|
||||
props.add(RedisUtils.POOL_MIN_IDLE);
|
||||
|
@ -268,12 +281,23 @@ public class RedisUtils {
|
|||
final Integer timeout = context.getProperty(RedisUtils.COMMUNICATION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
|
||||
final JedisPoolConfig poolConfig = createJedisPoolConfig(context);
|
||||
|
||||
final JedisClientConfiguration jedisClientConfiguration = JedisClientConfiguration.builder()
|
||||
JedisClientConfiguration.JedisClientConfigurationBuilder builder = JedisClientConfiguration.builder()
|
||||
.connectTimeout(Duration.ofMillis(timeout))
|
||||
.readTimeout(Duration.ofMillis(timeout))
|
||||
.usePooling()
|
||||
.poolConfig(poolConfig)
|
||||
.build();
|
||||
.and();
|
||||
|
||||
if (context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).isSet()) {
|
||||
final SSLContextService sslContextService = context.getProperty(RedisUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final SSLContext sslContext = sslContextService.createContext();
|
||||
builder = builder.useSsl()
|
||||
.sslParameters(sslContext.getSupportedSSLParameters())
|
||||
.sslSocketFactory(sslContext.getSocketFactory())
|
||||
.and();
|
||||
}
|
||||
|
||||
final JedisClientConfiguration jedisClientConfiguration = builder.build();
|
||||
JedisConnectionFactory connectionFactory;
|
||||
|
||||
if (RedisUtils.REDIS_MODE_STANDALONE.getValue().equals(redisMode)) {
|
||||
|
|
|
@ -19,17 +19,37 @@ package org.apache.nifi.redis.service;
|
|||
import org.apache.nifi.redis.RedisConnectionPool;
|
||||
import org.apache.nifi.redis.util.RedisUtils;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.StandardProcessorTestRunner;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
import java.security.GeneralSecurityException;
|
||||
|
||||
public class TestRedisConnectionPoolService {
|
||||
|
||||
public static final String SSL_CONTEXT_IDENTIFIER = "ssl-context-service";
|
||||
private TestRunner testRunner;
|
||||
private FakeRedisProcessor proc;
|
||||
private RedisConnectionPool redisService;
|
||||
|
||||
private static SSLContext sslContext;
|
||||
|
||||
@BeforeClass
|
||||
public static void classSetup() throws IOException, GeneralSecurityException {
|
||||
sslContext = SSLContext.getDefault();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setup() throws InitializationException {
|
||||
proc = new FakeRedisProcessor();
|
||||
|
@ -39,6 +59,69 @@ public class TestRedisConnectionPoolService {
|
|||
testRunner.addControllerService("redis-service", redisService);
|
||||
}
|
||||
|
||||
private void enableSslContextService() throws InitializationException {
|
||||
final RestrictedSSLContextService sslContextService = Mockito.mock(RestrictedSSLContextService.class);
|
||||
Mockito.when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_IDENTIFIER);
|
||||
Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
|
||||
testRunner.addControllerService(SSL_CONTEXT_IDENTIFIER, sslContextService);
|
||||
testRunner.enableControllerService(sslContextService);
|
||||
testRunner.setProperty(redisService, RedisUtils.SSL_CONTEXT_SERVICE, SSL_CONTEXT_IDENTIFIER);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSLContextService() throws InitializationException {
|
||||
this.setDefaultRedisProperties();
|
||||
|
||||
this.enableSslContextService();
|
||||
|
||||
testRunner.assertValid(redisService);
|
||||
testRunner.enableControllerService(redisService);
|
||||
|
||||
JedisConnectionFactory connectionFactory = getJedisConnectionFactory(); // Uses config from test runner
|
||||
|
||||
// Verify that the client configuration will be using an SSL socket factory and SSL parameters
|
||||
Assert.assertTrue(connectionFactory.getClientConfiguration().getSslSocketFactory().isPresent());
|
||||
Assert.assertTrue(connectionFactory.getClientConfiguration().getSslParameters().isPresent());
|
||||
|
||||
// Now remove the SSL context service
|
||||
testRunner.disableControllerService(redisService);
|
||||
testRunner.removeProperty(redisService, RedisUtils.SSL_CONTEXT_SERVICE);
|
||||
testRunner.enableControllerService(redisService);
|
||||
connectionFactory = getJedisConnectionFactory();
|
||||
|
||||
// Now the client configuration will not use SSL
|
||||
Assert.assertFalse(connectionFactory.getClientConfiguration().getSslSocketFactory().isPresent());
|
||||
Assert.assertFalse(connectionFactory.getClientConfiguration().getSslParameters().isPresent());
|
||||
}
|
||||
|
||||
private void setDefaultRedisProperties() {
|
||||
testRunner.setProperty(redisService, RedisUtils.REDIS_MODE, "Standalone");
|
||||
testRunner.setProperty(redisService, RedisUtils.CONNECTION_STRING, "localhost:6379");
|
||||
testRunner.setProperty(redisService, RedisUtils.DATABASE, "0");
|
||||
testRunner.setProperty(redisService, RedisUtils.COMMUNICATION_TIMEOUT, "60s");
|
||||
testRunner.setProperty(redisService, RedisUtils.CLUSTER_MAX_REDIRECTS, "5");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_MAX_TOTAL, "1");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_MAX_IDLE, "1");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_MIN_IDLE, "1");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_BLOCK_WHEN_EXHAUSTED, "true");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_MAX_WAIT_TIME, "1s");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_MIN_EVICTABLE_IDLE_TIME, "1s");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_TIME_BETWEEN_EVICTION_RUNS, "1s");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_NUM_TESTS_PER_EVICTION_RUN, "1");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_TEST_ON_CREATE, "false");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_TEST_ON_BORROW, "false");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_TEST_ON_RETURN, "false");
|
||||
testRunner.setProperty(redisService, RedisUtils.POOL_TEST_WHILE_IDLE, "false");
|
||||
}
|
||||
|
||||
private JedisConnectionFactory getJedisConnectionFactory() {
|
||||
MockProcessContext processContext = ((StandardProcessorTestRunner) testRunner).getProcessContext();
|
||||
MockConfigurationContext configContext = new MockConfigurationContext(processContext.getControllerServices()
|
||||
.get(redisService.getIdentifier()).getProperties(), processContext);
|
||||
JedisConnectionFactory connectionFactory = RedisUtils.createConnectionFactory(configContext, testRunner.getLogger());
|
||||
return connectionFactory;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateConnectionString() {
|
||||
testRunner.assertNotValid(redisService);
|
||||
|
|
Loading…
Reference in New Issue