NIFI-9897 This closes #5949. Refactored GRPC SSL Configuration

- Added SslContextProvider abstracting Netty SslContext configuration
- Removed runtime dependency on nifi-security-utils from nifi-grpc-processors
- Added TestListenGRPC with methods verifying socket connection and protocol negotiation

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
exceptionfactory 2022-04-08 14:46:46 -05:00 committed by Joe Witt
parent e76acabe9e
commit efd2421154
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
6 changed files with 324 additions and 44 deletions

View File

@ -37,7 +37,7 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<artifactId>nifi-security-utils-api</artifactId>
<version>1.17.0-SNAPSHOT</version>
</dependency>
<dependency>
@ -80,6 +80,12 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-ssl-context-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
<version>1.17.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>

View File

@ -20,10 +20,8 @@ import com.google.protobuf.ByteString;
import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.ManagedChannel;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContextBuilder;
import org.apache.commons.lang3.StringUtils;
import io.netty.handler.ssl.SslContext;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -45,8 +43,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.processors.grpc.ssl.SslContextProvider;
import org.apache.nifi.ssl.SSLContextService;
import java.io.InputStream;
@ -235,7 +232,7 @@ public class InvokeGRPC extends AbstractProcessor {
* @param context the processor context
*/
@OnScheduled
public void initializeClient(final ProcessContext context) throws Exception {
public void initializeClient(final ProcessContext context) {
channelReference.set(null);
blockingStubReference.set(null);
@ -263,18 +260,8 @@ public class InvokeGRPC extends AbstractProcessor {
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (useSecure) {
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
final SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
if (StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
sslContextBuilder.keyManager(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration));
}
if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration));
}
nettyChannelBuilder.sslContext(sslContextBuilder.build());
final SslContext clientSslContext = SslContextProvider.getSslContext(sslContextService, true);
nettyChannelBuilder.sslContext(clientSslContext);
} else {
nettyChannelBuilder.usePlaintext();
}
@ -439,11 +426,11 @@ public class InvokeGRPC extends AbstractProcessor {
private void logRequest(final ComponentLog logger, final String host, final String port, final FlowFileRequest flowFileRequest) {
logger.debug("\nRequest to remote service:\n\t{}\n{}",
new Object[]{getRemote(host, port), flowFileRequest.toString()});
getRemote(host, port), flowFileRequest.toString());
}
private void logReply(final ComponentLog logger, final String host, final String port, final FlowFileReply flowFileReply) {
logger.debug("\nResponse from remote service:\n\t{}\n{}",
new Object[]{getRemote(host, port), flowFileReply.toString()});
getRemote(host, port), flowFileReply.toString());
}
}

View File

@ -21,11 +21,8 @@ import io.grpc.CompressorRegistry;
import io.grpc.DecompressorRegistry;
import io.grpc.Server;
import io.grpc.ServerInterceptors;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.SslContextBuilder;
import org.apache.commons.lang3.StringUtils;
import io.netty.handler.ssl.SslContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
@ -44,8 +41,7 @@ import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.KeyStoreUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.processors.grpc.ssl.SslContextProvider;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;
@ -203,24 +199,10 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
.maxInboundMessageSize(maxMessageSize);
if (useSecure) {
if (StringUtils.isBlank(sslContextService.getKeyStoreFile())) {
throw new IllegalStateException("SSL is enabled, but no keystore has been configured. You must configure a keystore.");
}
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration));
// if the trust store is configured, then client auth is required.
if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration));
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
} else {
sslContextBuilder.clientAuth(ClientAuth.NONE);
}
GrpcSslContexts.configure(sslContextBuilder);
serverBuilder.sslContext(sslContextBuilder.build());
final SslContext serverSslContext = SslContextProvider.getSslContext(sslContextService, false);
serverBuilder.sslContext(serverSslContext);
}
logger.info("Starting gRPC server on port: {}", new Object[]{port.toString()});
logger.info("Starting gRPC server on port: {}", port.toString());
this.server = serverBuilder.build().start();
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc.ssl;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
/**
* Provider for Netty SslContext from NiFi SSLContextService
*/
public class SslContextProvider {
private static final boolean START_TLS = false;
private static final String H2_PROTOCOL = "h2";
public static SslContext getSslContext(final SSLContextService sslContextService, final boolean client) {
final SSLContext sslContext = sslContextService.createContext();
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
final ClientAuth clientAuth = StringUtils.isBlank(tlsConfiguration.getTruststorePath()) ? ClientAuth.NONE : ClientAuth.REQUIRE;
final ApplicationProtocolConfig applicationProtocolConfig = new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
H2_PROTOCOL
);
return new JdkSslContext(
sslContext,
client,
Http2SecurityUtil.CIPHERS,
SupportedCipherSuiteFilter.INSTANCE,
applicationProtocolConfig,
clientAuth,
tlsConfiguration.getEnabledProtocols(),
START_TLS
);
}
}

View File

@ -0,0 +1,134 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@Timeout(10)
@ExtendWith(MockitoExtension.class)
class TestListenGRPC {
static final String LOCALHOST = "localhost";
static final String SSL_SERVICE_ID = RestrictedSSLContextService.class.getSimpleName();
static TlsConfiguration tlsConfiguration;
static SSLContext sslContext;
@Mock
RestrictedSSLContextService sslContextService;
TestRunner runner;
@BeforeAll
static void setTlsConfiguration() throws TlsException {
tlsConfiguration = new TemporaryKeyStoreBuilder().build();
sslContext = SslContextFactory.createSslContext(tlsConfiguration);
}
@BeforeEach
void setRunner() {
runner = TestRunners.newTestRunner(ListenGRPC.class);
}
@Test
void testRunSocketListening() throws IOException {
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, Integer.toString(port));
runner.assertValid();
runner.run(1, false);
assertSocketConnected(port, SocketFactory.getDefault());
}
@Test
void testRunSocketListeningSslContextService() throws IOException, InitializationException {
final int port = NetworkUtils.getAvailableTcpPort();
runner.setProperty(ListenGRPC.PROP_SERVICE_PORT, Integer.toString(port));
when(sslContextService.getIdentifier()).thenReturn(SSL_SERVICE_ID);
when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration);
when(sslContextService.createContext()).thenReturn(sslContext);
runner.addControllerService(SSL_SERVICE_ID, sslContextService);
runner.enableControllerService(sslContextService);
runner.setProperty(ListenGRPC.PROP_SSL_CONTEXT_SERVICE, SSL_SERVICE_ID);
runner.setProperty(ListenGRPC.PROP_USE_SECURE, Boolean.TRUE.toString());
runner.assertValid();
runner.run(1, false);
assertSocketConnectedProtocolNegotiated(port, sslContext.getSocketFactory());
}
private void assertSocketConnected(final int port, final SocketFactory socketFactory) throws IOException {
try (final Socket socket = socketFactory.createSocket()) {
assertSocketConnected(port, socket);
}
}
private void assertSocketConnectedProtocolNegotiated(final int port, final SSLSocketFactory socketFactory) throws IOException {
try (final SSLSocket socket = (SSLSocket) socketFactory.createSocket()) {
assertSocketConnected(port, socket);
socket.startHandshake();
final SSLSession session = socket.getSession();
assertNotNull(session);
assertNotNull(session.getCipherSuite());
}
}
private void assertSocketConnected(final int port, final Socket socket) throws IOException {
final InetSocketAddress socketAddress = new InetSocketAddress(LOCALHOST, port);
socket.connect(socketAddress);
assertTrue(socket.isConnected());
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.grpc.ssl;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.SslContext;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TemporaryKeyStoreBuilder;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class SslContextProviderTest {
static final List<String> APPLICATION_PROTOCOLS = Collections.singletonList("h2");
static TlsConfiguration tlsConfiguration;
static SSLContext sslContext;
@Mock
RestrictedSSLContextService sslContextService;
@BeforeAll
static void setTlsConfiguration() throws TlsException {
tlsConfiguration = new TemporaryKeyStoreBuilder().build();
sslContext = SslContextFactory.createSslContext(tlsConfiguration);
}
@Test
void testGetClientSslContext() {
when(sslContextService.createContext()).thenReturn(sslContext);
when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration);
final SslContext clientSslContext = SslContextProvider.getSslContext(sslContextService, true);
assertNotNull(clientSslContext);
assertTrue(clientSslContext.isClient());
assertFalse(clientSslContext.isServer());
assertEquals(APPLICATION_PROTOCOLS, clientSslContext.applicationProtocolNegotiator().protocols());
}
@Test
void testGetServerSslContextClientAuthRequired() {
when(sslContextService.createContext()).thenReturn(sslContext);
when(sslContextService.createTlsConfiguration()).thenReturn(tlsConfiguration);
final SslContext serverSslContext = SslContextProvider.getSslContext(sslContextService, false);
assertServerStatus(serverSslContext);
final SSLEngine sslEngine = serverSslContext.newEngine(ByteBufAllocator.DEFAULT);
assertTrue(sslEngine.getNeedClientAuth());
}
@Test
void testGetServerSslContextClientAuthNone() {
when(sslContextService.createContext()).thenReturn(sslContext);
final TlsConfiguration keyStoreConfiguration = new StandardTlsConfiguration();
when(sslContextService.createTlsConfiguration()).thenReturn(keyStoreConfiguration);
final SslContext serverSslContext = SslContextProvider.getSslContext(sslContextService, false);
assertServerStatus(serverSslContext);
final SSLEngine sslEngine = serverSslContext.newEngine(ByteBufAllocator.DEFAULT);
assertFalse(sslEngine.getNeedClientAuth());
}
private void assertServerStatus(final SslContext configuredSslContext) {
assertNotNull(configuredSslContext);
assertFalse(configuredSslContext.isClient());
assertTrue(configuredSslContext.isServer());
assertEquals(APPLICATION_PROTOCOLS, configuredSslContext.applicationProtocolNegotiator().protocols());
}
}