From 584adc3b917a015b20171b473f2b677343d01779 Mon Sep 17 00:00:00 2001 From: Peter Turcsanyi Date: Wed, 9 Dec 2020 19:51:45 +0100 Subject: [PATCH] NIFI-8067 - Fix 1-way SSL in GRPC processors Signed-off-by: Pierre Villard This closes #4733. --- .../nifi-grpc-processors/pom.xml | 5 + .../nifi/processors/grpc/InvokeGRPC.java | 90 ++++++++------- .../nifi/processors/grpc/ListenGRPC.java | 108 +++++++++--------- .../nifi/processors/grpc/ITListenGRPC.java | 25 ++-- .../nifi/processors/grpc/TestGRPCClient.java | 16 +-- .../nifi/processors/grpc/TestGRPCServer.java | 12 +- 6 files changed, 132 insertions(+), 124 deletions(-) diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml index 11cb58cd98..648a6c8820 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/pom.xml @@ -35,6 +35,11 @@ language governing permissions and limitations under the License. --> org.apache.nifi nifi-ssl-context-service-api + + org.apache.nifi + nifi-security-utils + 1.13.0-SNAPSHOT + org.apache.commons commons-lang3 diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java index 28cae927ef..223dfe57d7 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/InvokeGRPC.java @@ -23,21 +23,6 @@ import io.grpc.ManagedChannel; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyChannelBuilder; import io.netty.handler.ssl.SslContextBuilder; -import java.io.FileInputStream; -import java.io.InputStream; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.KeyStore; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -49,6 +34,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; @@ -58,9 +45,23 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.security.util.ClientAuth; +import org.apache.nifi.security.util.KeyStoreUtils; +import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.ssl.SSLContextService; +import java.io.InputStream; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + @EventDriven @SupportsBatching @Tags({"grpc", "rpc", "client"}) @@ -108,16 +109,18 @@ public class InvokeGRPC extends AbstractProcessor { .build(); public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder() .name("Use SSL/TLS") - .description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.") + .displayName("Use TLS") + .description("Whether or not to use TLS to send the contents of the gRPC messages.") .required(false) .defaultValue("false") .allowableValues("true", "false") .build(); public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") - .description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") + .description("The SSL Context Service used to provide client certificate information for TLS (https) connections.") .required(false) .identifiesControllerService(SSLContextService.class) + .dependsOn(PROP_USE_SECURE, "true") .build(); public static final PropertyDescriptor PROP_SEND_CONTENT = new PropertyDescriptor.Builder() .name("Send FlowFile Content") @@ -207,6 +210,24 @@ public class InvokeGRPC extends AbstractProcessor { return RELATIONSHIPS; } + @Override + protected Collection customValidate(ValidationContext context) { + List results = new ArrayList<>(super.customValidate(context)); + + final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean(); + final boolean sslContextServiceConfigured = context.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet(); + + if (useSecure && !sslContextServiceConfigured) { + results.add(new ValidationResult.Builder() + .subject(PROP_SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .explanation(String.format("'%s' must be configured when '%s' is true", PROP_SSL_CONTEXT_SERVICE.getDisplayName(), PROP_USE_SECURE.getDisplayName())) + .build()); + } + + return results; + } + /** * Whenever this processor is triggered, we need to construct a client in order to communicate * with the configured gRPC service. @@ -222,7 +243,7 @@ public class InvokeGRPC extends AbstractProcessor { final String host = context.getProperty(PROP_SERVICE_HOST).getValue(); final int port = context.getProperty(PROP_SERVICE_PORT).asInteger(); - final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); + final int maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); String userAgent = USER_AGENT_PREFIX; try { userAgent += "_" + InetAddress.getLocalHost().getHostName(); @@ -240,33 +261,20 @@ public class InvokeGRPC extends AbstractProcessor { // configure whether or not we're using secure comms final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean(); final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.NONE); - if (useSecure && sslContext != null) { - SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); - if(StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) { - final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(), - sslContext.getProvider()); - final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType()); - try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) { - keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray()); - } - keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray()); - sslContextBuilder.keyManager(keyManagerFactory); + if (useSecure) { + final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration(); + final SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient(); + + if (StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) { + sslContextBuilder.keyManager(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration)); } - if(StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) { - final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(), - sslContext.getProvider()); - final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType()); - try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) { - trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray()); - } - trustManagerFactory.init(trustStore); - sslContextBuilder.trustManager(trustManagerFactory); + if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) { + sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration)); } + nettyChannelBuilder.sslContext(sslContextBuilder.build()); - } else { nettyChannelBuilder.usePlaintext(); } diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java index f9e8616969..4ce9c748fa 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/main/java/org/apache/nifi/processors/grpc/ListenGRPC.java @@ -23,25 +23,8 @@ import io.grpc.Server; import io.grpc.ServerInterceptors; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NettyServerBuilder; +import io.netty.handler.ssl.ClientAuth; import io.netty.handler.ssl.SslContextBuilder; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.WritesAttribute; @@ -51,6 +34,8 @@ import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.DataUnit; @@ -59,9 +44,21 @@ import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.KeyStoreUtils; +import org.apache.nifi.security.util.TlsConfiguration; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Starts a gRPC server and listens on the given port to transform the incoming messages into FlowFiles." + " The message format is defined by the standard gRPC protobuf IDL provided by NiFi. gRPC isn't intended to carry large payloads," + @@ -86,7 +83,7 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder() .name("Use TLS") .displayName("Use TLS") - .description("Whether or not to use TLS to send the contents of the gRPC messages.") + .description("Whether or not to use TLS to receive the contents of the gRPC messages.") .required(false) .defaultValue("false") .allowableValues("true", "false") @@ -94,9 +91,11 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .displayName("SSL Context Service") - .description("The SSL Context Service used to provide client certificate information for TLS (https) connections.") + .description("The SSL Context Service used to provide server certificate information for TLS (https) connections. Keystore must be configured on the service." + + " If truststore is also configured, it will turn on and require client certificate authentication (Mutual TLS).") .required(false) .identifiesControllerService(RestrictedSSLContextService.class) + .dependsOn(PROP_USE_SECURE, "true") .build(); public static final PropertyDescriptor PROP_FLOW_CONTROL_WINDOW = new PropertyDescriptor.Builder() .name("Flow Control Window") @@ -121,18 +120,21 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor { public static final PropertyDescriptor PROP_AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder() .name("Authorized DN Pattern") .displayName("Authorized DN Pattern") - .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.") - .required(true) + .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused." + + " The property will only be used if client certificate authentication (Mutual TLS) has been configured on " + PROP_SSL_CONTEXT_SERVICE.getDisplayName() + "," + + " otherwise it will be ignored.") + .required(false) .defaultValue(".*") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) + .dependsOn(PROP_USE_SECURE, "true") .build(); public static final List PROPERTIES = Collections.unmodifiableList(Arrays.asList( PROP_SERVICE_PORT, PROP_USE_SECURE, PROP_SSL_CONTEXT_SERVICE, - PROP_FLOW_CONTROL_WINDOW, PROP_AUTHORIZED_DN_PATTERN, + PROP_FLOW_CONTROL_WINDOW, PROP_MAX_MESSAGE_SIZE )); @@ -153,23 +155,38 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor { return RELATIONSHIPS; } - @Override protected List getSupportedPropertyDescriptors() { return PROPERTIES; } + @Override + protected Collection customValidate(ValidationContext context) { + List results = new ArrayList<>(super.customValidate(context)); + + final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean(); + final boolean sslContextServiceConfigured = context.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet(); + + if (useSecure && !sslContextServiceConfigured) { + results.add(new ValidationResult.Builder() + .subject(PROP_SSL_CONTEXT_SERVICE.getDisplayName()) + .valid(false) + .explanation(String.format("'%s' must be configured when '%s' is true", PROP_SSL_CONTEXT_SERVICE.getDisplayName(), PROP_USE_SECURE.getDisplayName())) + .build()); + } + + return results; + } @OnScheduled - public void startServer(final ProcessContext context) throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException { + public void startServer(final ProcessContext context) throws Exception { final ComponentLog logger = getLogger(); // gather configured properties final Integer port = context.getProperty(PROP_SERVICE_PORT).asInteger(); final Boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean(); - final Integer flowControlWindow = context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue(); - final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); + final int flowControlWindow = context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue(); + final int maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue(); final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(org.apache.nifi.security.util.ClientAuth.NONE); final Pattern authorizedDnPattern = Pattern.compile(context.getProperty(PROP_AUTHORIZED_DN_PATTERN).getValue()); final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger()); callInterceptor.enforceDNPattern(authorizedDnPattern); @@ -177,46 +194,31 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor { final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(), sessionFactoryReference, context); - NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) + final NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port) .addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor)) // default (de)compressor registries handle both plaintext and gzip compressed messages .compressorRegistry(CompressorRegistry.getDefaultInstance()) .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) .flowControlWindow(flowControlWindow) - .maxMessageSize(maxMessageSize); + .maxInboundMessageSize(maxMessageSize); - if (useSecure && sslContext != null) { - // construct key manager + if (useSecure) { if (StringUtils.isBlank(sslContextService.getKeyStoreFile())) { throw new IllegalStateException("SSL is enabled, but no keystore has been configured. You must configure a keystore."); } - final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(), - sslContext.getProvider()); - final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType()); - try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) { - keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray()); - } - keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray()); - - SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory); + final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration(); + final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration)); // if the trust store is configured, then client auth is required. if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) { - final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(), - sslContext.getProvider()); - final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType()); - try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) { - trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray()); - } - trustManagerFactory.init(trustStore); - sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory); - sslContextBuilder = sslContextBuilder.clientAuth(io.netty.handler.ssl.ClientAuth.REQUIRE); + sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration)); + sslContextBuilder.clientAuth(ClientAuth.REQUIRE); } else { - sslContextBuilder = sslContextBuilder.clientAuth(io.netty.handler.ssl.ClientAuth.NONE); + sslContextBuilder.clientAuth(ClientAuth.NONE); } - sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder); - serverBuilder = serverBuilder.sslContext(sslContextBuilder.build()); + GrpcSslContexts.configure(sslContextBuilder); + serverBuilder.sslContext(sslContextBuilder.build()); } logger.info("Starting gRPC server on port: {}", new Object[]{port.toString()}); this.server = serverBuilder.build().start(); diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java index 16929a4179..9adf26c1d3 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/ITListenGRPC.java @@ -18,7 +18,7 @@ package org.apache.nifi.processors.grpc; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; - +import io.grpc.ManagedChannel; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; @@ -31,20 +31,13 @@ import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; import java.util.HashMap; import java.util.List; import java.util.Map; -import io.grpc.ManagedChannel; - import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.StringContains.containsString; -import static org.junit.Assert.assertThat; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -83,7 +76,7 @@ public class ITListenGRPC { } @Test - public void testSuccessfulRoundTrip() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + public void testSuccessfulRoundTrip() throws Exception { final int randPort = TestGRPCClient.randomPort(); final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort); final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel); @@ -126,7 +119,7 @@ public class ITListenGRPC { } @Test - public void testOutOfSpaceRoundTrip() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + public void testOutOfSpaceRoundTrip() throws Exception { final int randPort = TestGRPCClient.randomPort(); final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort); final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel); @@ -163,7 +156,7 @@ public class ITListenGRPC { } @Test(expected = io.grpc.StatusRuntimeException.class) - public void testExceedMaxMessageSize() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + public void testExceedMaxMessageSize() throws Exception { final int randPort = TestGRPCClient.randomPort(); final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort); final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel); @@ -209,7 +202,7 @@ public class ITListenGRPC { } @Test - public void testSecureTwoWaySSL() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + public void testSecureTwoWaySSL() throws Exception { final int randPort = TestGRPCClient.randomPort(); final Map sslProperties = getKeystoreProperties(); sslProperties.putAll(getTruststoreProperties()); @@ -256,7 +249,7 @@ public class ITListenGRPC { } @Test - public void testSecureOneWaySSL() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, InterruptedException { + public void testSecureOneWaySSL() throws Exception { final int randPort = TestGRPCClient.randomPort(); final Map sslProperties = getTruststoreProperties(); final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort, sslProperties); @@ -304,7 +297,7 @@ public class ITListenGRPC { } @Test(expected = io.grpc.StatusRuntimeException.class) - public void testSecureTwoWaySSLFailAuthorizedDNCheck() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + public void testSecureTwoWaySSLFailAuthorizedDNCheck() throws Exception { final int randPort = TestGRPCClient.randomPort(); final Map sslProperties = getKeystoreProperties(); sslProperties.putAll(getTruststoreProperties()); @@ -352,7 +345,7 @@ public class ITListenGRPC { } @Test - public void testSecureTwoWaySSLPassAuthorizedDNCheck() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException { + public void testSecureTwoWaySSLPassAuthorizedDNCheck() throws Exception { final int randPort = TestGRPCClient.randomPort(); final Map sslProperties = getKeystoreProperties(); sslProperties.putAll(getTruststoreProperties()); diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java index 28e94c6ff6..dbddf79cea 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCClient.java @@ -83,14 +83,14 @@ public class TestGRPCClient { */ public static ManagedChannel buildChannel(final String host, final int port, final Map sslProperties) throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException { - NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port) + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port) .directExecutor() .compressorRegistry(CompressorRegistry.getDefaultInstance()) .decompressorRegistry(DecompressorRegistry.getDefaultInstance()) .userAgent("testAgent"); if (sslProperties != null) { - SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); + final SslContextBuilder sslContextBuilder = SslContextBuilder.forClient(); if(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) { final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); @@ -101,7 +101,7 @@ public class TestGRPCClient { keyStore.load(is, keyStorePassword.toCharArray()); } keyManager.init(keyStore, keyStorePassword.toCharArray()); - sslContextBuilder = sslContextBuilder.keyManager(keyManager); + sslContextBuilder.keyManager(keyManager); } if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { @@ -113,17 +113,17 @@ public class TestGRPCClient { trustStore.load(is, trustStorePassword.toCharArray()); } trustManagerFactory.init(trustStore); - sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory); + sslContextBuilder.trustManager(trustManagerFactory); } final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); if (clientAuth == null) { - sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + sslContextBuilder.clientAuth(ClientAuth.REQUIRE); } else { - sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)); + sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)); } - sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder); - channelBuilder = channelBuilder.sslContext(sslContextBuilder.build()); + GrpcSslContexts.configure(sslContextBuilder); + channelBuilder.sslContext(sslContextBuilder.build()); } else { channelBuilder.usePlaintext(); } diff --git a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java index 7e11b4db48..b885853064 100644 --- a/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java +++ b/nifi-nar-bundles/nifi-grpc-bundle/nifi-grpc-processors/src/test/java/org/apache/nifi/processors/grpc/TestGRPCServer.java @@ -46,8 +46,8 @@ public class TestGRPCServer { public static final String HOST = "localhost"; public static final String NEED_CLIENT_AUTH = "needClientAuth"; private final Class clazz; + private final Map sslProperties; private Server server; - private Map sslProperties; /** * Create a gRPC server @@ -107,7 +107,7 @@ public class TestGRPCServer { keyStore.load(is, keyStorePassword.toCharArray()); } keyManager.init(keyStore, keyStorePassword.toCharArray()); - SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager); + final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager); if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) { final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); @@ -118,16 +118,16 @@ public class TestGRPCServer { trustStore.load(is, trustStorePassword.toCharArray()); } trustManagerFactory.init(trustStore); - sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory); + sslContextBuilder.trustManager(trustManagerFactory); } final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH); if (clientAuth == null) { - sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE); + sslContextBuilder.clientAuth(ClientAuth.REQUIRE); } else { - sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)); + sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth)); } - sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder); + GrpcSslContexts.configure(sslContextBuilder); nettyServerBuilder.sslContext(sslContextBuilder.build()); }