mirror of https://github.com/apache/nifi.git
NIFI-8067 - Fix 1-way SSL in GRPC processors
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com> This closes #4733.
This commit is contained in:
parent
ceb9dff3b9
commit
584adc3b91
|
@ -35,6 +35,11 @@ language governing permissions and limitations under the License. -->
|
|||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-ssl-context-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-security-utils</artifactId>
|
||||
<version>1.13.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
|
|
|
@ -23,21 +23,6 @@ import io.grpc.ManagedChannel;
|
|||
import io.grpc.netty.GrpcSslContexts;
|
||||
import io.grpc.netty.NettyChannelBuilder;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
|
@ -49,6 +34,8 @@ import org.apache.nifi.annotation.documentation.Tags;
|
|||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnShutdown;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.flowfile.FlowFile;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
|
@ -58,9 +45,23 @@ import org.apache.nifi.processor.ProcessSession;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.util.ClientAuth;
|
||||
import org.apache.nifi.security.util.KeyStoreUtils;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@EventDriven
|
||||
@SupportsBatching
|
||||
@Tags({"grpc", "rpc", "client"})
|
||||
|
@ -108,16 +109,18 @@ public class InvokeGRPC extends AbstractProcessor {
|
|||
.build();
|
||||
public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder()
|
||||
.name("Use SSL/TLS")
|
||||
.description("Whether or not to use SSL/TLS to send the contents of the gRPC messages.")
|
||||
.displayName("Use TLS")
|
||||
.description("Whether or not to use TLS to send the contents of the gRPC messages.")
|
||||
.required(false)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
.build();
|
||||
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS (https) connections.")
|
||||
.required(false)
|
||||
.identifiesControllerService(SSLContextService.class)
|
||||
.dependsOn(PROP_USE_SECURE, "true")
|
||||
.build();
|
||||
public static final PropertyDescriptor PROP_SEND_CONTENT = new PropertyDescriptor.Builder()
|
||||
.name("Send FlowFile Content")
|
||||
|
@ -207,6 +210,24 @@ public class InvokeGRPC extends AbstractProcessor {
|
|||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
|
||||
|
||||
final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
|
||||
final boolean sslContextServiceConfigured = context.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet();
|
||||
|
||||
if (useSecure && !sslContextServiceConfigured) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(PROP_SSL_CONTEXT_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(String.format("'%s' must be configured when '%s' is true", PROP_SSL_CONTEXT_SERVICE.getDisplayName(), PROP_USE_SECURE.getDisplayName()))
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whenever this processor is triggered, we need to construct a client in order to communicate
|
||||
* with the configured gRPC service.
|
||||
|
@ -222,7 +243,7 @@ public class InvokeGRPC extends AbstractProcessor {
|
|||
|
||||
final String host = context.getProperty(PROP_SERVICE_HOST).getValue();
|
||||
final int port = context.getProperty(PROP_SERVICE_PORT).asInteger();
|
||||
final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final int maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
String userAgent = USER_AGENT_PREFIX;
|
||||
try {
|
||||
userAgent += "_" + InetAddress.getLocalHost().getHostName();
|
||||
|
@ -240,33 +261,20 @@ public class InvokeGRPC extends AbstractProcessor {
|
|||
// configure whether or not we're using secure comms
|
||||
final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
|
||||
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(ClientAuth.NONE);
|
||||
|
||||
if (useSecure && sslContext != null) {
|
||||
SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
|
||||
if(StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
|
||||
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
|
||||
sslContext.getProvider());
|
||||
final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType());
|
||||
try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) {
|
||||
keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray());
|
||||
}
|
||||
keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray());
|
||||
sslContextBuilder.keyManager(keyManagerFactory);
|
||||
if (useSecure) {
|
||||
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
|
||||
final SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
|
||||
|
||||
if (StringUtils.isNotBlank(sslContextService.getKeyStoreFile())) {
|
||||
sslContextBuilder.keyManager(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration));
|
||||
}
|
||||
|
||||
if(StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
|
||||
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
|
||||
sslContext.getProvider());
|
||||
final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType());
|
||||
try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) {
|
||||
trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray());
|
||||
}
|
||||
trustManagerFactory.init(trustStore);
|
||||
sslContextBuilder.trustManager(trustManagerFactory);
|
||||
if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
|
||||
sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration));
|
||||
}
|
||||
|
||||
nettyChannelBuilder.sslContext(sslContextBuilder.build());
|
||||
|
||||
} else {
|
||||
nettyChannelBuilder.usePlaintext();
|
||||
}
|
||||
|
|
|
@ -23,25 +23,8 @@ import io.grpc.Server;
|
|||
import io.grpc.ServerInterceptors;
|
||||
import io.grpc.netty.GrpcSslContexts;
|
||||
import io.grpc.netty.NettyServerBuilder;
|
||||
import io.netty.handler.ssl.ClientAuth;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.security.KeyStore;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Pattern;
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
import org.apache.nifi.annotation.behavior.WritesAttribute;
|
||||
|
@ -51,6 +34,8 @@ import org.apache.nifi.annotation.documentation.Tags;
|
|||
import org.apache.nifi.annotation.lifecycle.OnScheduled;
|
||||
import org.apache.nifi.annotation.lifecycle.OnStopped;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
|
||||
import org.apache.nifi.processor.DataUnit;
|
||||
|
@ -59,9 +44,21 @@ import org.apache.nifi.processor.ProcessSessionFactory;
|
|||
import org.apache.nifi.processor.Relationship;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.security.util.KeyStoreUtils;
|
||||
import org.apache.nifi.security.util.TlsConfiguration;
|
||||
import org.apache.nifi.ssl.RestrictedSSLContextService;
|
||||
import org.apache.nifi.ssl.SSLContextService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
|
||||
@CapabilityDescription("Starts a gRPC server and listens on the given port to transform the incoming messages into FlowFiles." +
|
||||
" The message format is defined by the standard gRPC protobuf IDL provided by NiFi. gRPC isn't intended to carry large payloads," +
|
||||
|
@ -86,7 +83,7 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
|
|||
public static final PropertyDescriptor PROP_USE_SECURE = new PropertyDescriptor.Builder()
|
||||
.name("Use TLS")
|
||||
.displayName("Use TLS")
|
||||
.description("Whether or not to use TLS to send the contents of the gRPC messages.")
|
||||
.description("Whether or not to use TLS to receive the contents of the gRPC messages.")
|
||||
.required(false)
|
||||
.defaultValue("false")
|
||||
.allowableValues("true", "false")
|
||||
|
@ -94,9 +91,11 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
|
|||
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
|
||||
.name("SSL Context Service")
|
||||
.displayName("SSL Context Service")
|
||||
.description("The SSL Context Service used to provide client certificate information for TLS (https) connections.")
|
||||
.description("The SSL Context Service used to provide server certificate information for TLS (https) connections. Keystore must be configured on the service." +
|
||||
" If truststore is also configured, it will turn on and require client certificate authentication (Mutual TLS).")
|
||||
.required(false)
|
||||
.identifiesControllerService(RestrictedSSLContextService.class)
|
||||
.dependsOn(PROP_USE_SECURE, "true")
|
||||
.build();
|
||||
public static final PropertyDescriptor PROP_FLOW_CONTROL_WINDOW = new PropertyDescriptor.Builder()
|
||||
.name("Flow Control Window")
|
||||
|
@ -121,18 +120,21 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
|
|||
public static final PropertyDescriptor PROP_AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder()
|
||||
.name("Authorized DN Pattern")
|
||||
.displayName("Authorized DN Pattern")
|
||||
.description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.")
|
||||
.required(true)
|
||||
.description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused." +
|
||||
" The property will only be used if client certificate authentication (Mutual TLS) has been configured on " + PROP_SSL_CONTEXT_SERVICE.getDisplayName() + "," +
|
||||
" otherwise it will be ignored.")
|
||||
.required(false)
|
||||
.defaultValue(".*")
|
||||
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
|
||||
.dependsOn(PROP_USE_SECURE, "true")
|
||||
.build();
|
||||
|
||||
public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(
|
||||
PROP_SERVICE_PORT,
|
||||
PROP_USE_SECURE,
|
||||
PROP_SSL_CONTEXT_SERVICE,
|
||||
PROP_FLOW_CONTROL_WINDOW,
|
||||
PROP_AUTHORIZED_DN_PATTERN,
|
||||
PROP_FLOW_CONTROL_WINDOW,
|
||||
PROP_MAX_MESSAGE_SIZE
|
||||
));
|
||||
|
||||
|
@ -153,23 +155,38 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
|
|||
return RELATIONSHIPS;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
return PROPERTIES;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext context) {
|
||||
List<ValidationResult> results = new ArrayList<>(super.customValidate(context));
|
||||
|
||||
final boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
|
||||
final boolean sslContextServiceConfigured = context.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet();
|
||||
|
||||
if (useSecure && !sslContextServiceConfigured) {
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject(PROP_SSL_CONTEXT_SERVICE.getDisplayName())
|
||||
.valid(false)
|
||||
.explanation(String.format("'%s' must be configured when '%s' is true", PROP_SSL_CONTEXT_SERVICE.getDisplayName(), PROP_USE_SECURE.getDisplayName()))
|
||||
.build());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@OnScheduled
|
||||
public void startServer(final ProcessContext context) throws NoSuchAlgorithmException, IOException, KeyStoreException, CertificateException, UnrecoverableKeyException {
|
||||
public void startServer(final ProcessContext context) throws Exception {
|
||||
final ComponentLog logger = getLogger();
|
||||
// gather configured properties
|
||||
final Integer port = context.getProperty(PROP_SERVICE_PORT).asInteger();
|
||||
final Boolean useSecure = context.getProperty(PROP_USE_SECURE).asBoolean();
|
||||
final Integer flowControlWindow = context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue();
|
||||
final Integer maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final int flowControlWindow = context.getProperty(PROP_FLOW_CONTROL_WINDOW).asDataSize(DataUnit.B).intValue();
|
||||
final int maxMessageSize = context.getProperty(PROP_MAX_MESSAGE_SIZE).asDataSize(DataUnit.B).intValue();
|
||||
final SSLContextService sslContextService = context.getProperty(PROP_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
|
||||
final SSLContext sslContext = sslContextService == null ? null : sslContextService.createSSLContext(org.apache.nifi.security.util.ClientAuth.NONE);
|
||||
final Pattern authorizedDnPattern = Pattern.compile(context.getProperty(PROP_AUTHORIZED_DN_PATTERN).getValue());
|
||||
final FlowFileIngestServiceInterceptor callInterceptor = new FlowFileIngestServiceInterceptor(getLogger());
|
||||
callInterceptor.enforceDNPattern(authorizedDnPattern);
|
||||
|
@ -177,46 +194,31 @@ public class ListenGRPC extends AbstractSessionFactoryProcessor {
|
|||
final FlowFileIngestService flowFileIngestService = new FlowFileIngestService(getLogger(),
|
||||
sessionFactoryReference,
|
||||
context);
|
||||
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
|
||||
final NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(port)
|
||||
.addService(ServerInterceptors.intercept(flowFileIngestService, callInterceptor))
|
||||
// default (de)compressor registries handle both plaintext and gzip compressed messages
|
||||
.compressorRegistry(CompressorRegistry.getDefaultInstance())
|
||||
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
|
||||
.flowControlWindow(flowControlWindow)
|
||||
.maxMessageSize(maxMessageSize);
|
||||
.maxInboundMessageSize(maxMessageSize);
|
||||
|
||||
if (useSecure && sslContext != null) {
|
||||
// construct key manager
|
||||
if (useSecure) {
|
||||
if (StringUtils.isBlank(sslContextService.getKeyStoreFile())) {
|
||||
throw new IllegalStateException("SSL is enabled, but no keystore has been configured. You must configure a keystore.");
|
||||
}
|
||||
|
||||
final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm(),
|
||||
sslContext.getProvider());
|
||||
final KeyStore keyStore = KeyStore.getInstance(sslContextService.getKeyStoreType());
|
||||
try (final InputStream is = new FileInputStream(sslContextService.getKeyStoreFile())) {
|
||||
keyStore.load(is, sslContextService.getKeyStorePassword().toCharArray());
|
||||
}
|
||||
keyManagerFactory.init(keyStore, sslContextService.getKeyStorePassword().toCharArray());
|
||||
|
||||
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManagerFactory);
|
||||
final TlsConfiguration tlsConfiguration = sslContextService.createTlsConfiguration();
|
||||
final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(KeyStoreUtils.loadKeyManagerFactory(tlsConfiguration));
|
||||
|
||||
// if the trust store is configured, then client auth is required.
|
||||
if (StringUtils.isNotBlank(sslContextService.getTrustStoreFile())) {
|
||||
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm(),
|
||||
sslContext.getProvider());
|
||||
final KeyStore trustStore = KeyStore.getInstance(sslContextService.getTrustStoreType());
|
||||
try (final InputStream is = new FileInputStream(sslContextService.getTrustStoreFile())) {
|
||||
trustStore.load(is, sslContextService.getTrustStorePassword().toCharArray());
|
||||
}
|
||||
trustManagerFactory.init(trustStore);
|
||||
sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory);
|
||||
sslContextBuilder = sslContextBuilder.clientAuth(io.netty.handler.ssl.ClientAuth.REQUIRE);
|
||||
sslContextBuilder.trustManager(KeyStoreUtils.loadTrustManagerFactory(tlsConfiguration));
|
||||
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
|
||||
} else {
|
||||
sslContextBuilder = sslContextBuilder.clientAuth(io.netty.handler.ssl.ClientAuth.NONE);
|
||||
sslContextBuilder.clientAuth(ClientAuth.NONE);
|
||||
}
|
||||
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
|
||||
serverBuilder = serverBuilder.sslContext(sslContextBuilder.build());
|
||||
GrpcSslContexts.configure(sslContextBuilder);
|
||||
serverBuilder.sslContext(sslContextBuilder.build());
|
||||
}
|
||||
logger.info("Starting gRPC server on port: {}", new Object[]{port.toString()});
|
||||
this.server = serverBuilder.build().start();
|
||||
|
|
|
@ -18,7 +18,7 @@ package org.apache.nifi.processors.grpc;
|
|||
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import org.apache.nifi.flowfile.attributes.CoreAttributes;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSessionFactory;
|
||||
|
@ -31,20 +31,13 @@ import org.apache.nifi.util.TestRunners;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.KeyStoreException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.UnrecoverableKeyException;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.core.StringContains.containsString;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -83,7 +76,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSuccessfulRoundTrip() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
|
||||
public void testSuccessfulRoundTrip() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort);
|
||||
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
|
||||
|
@ -126,7 +119,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testOutOfSpaceRoundTrip() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
|
||||
public void testOutOfSpaceRoundTrip() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort);
|
||||
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
|
||||
|
@ -163,7 +156,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test(expected = io.grpc.StatusRuntimeException.class)
|
||||
public void testExceedMaxMessageSize() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
|
||||
public void testExceedMaxMessageSize() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort);
|
||||
final FlowFileServiceGrpc.FlowFileServiceBlockingStub stub = FlowFileServiceGrpc.newBlockingStub(channel);
|
||||
|
@ -209,7 +202,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSecureTwoWaySSL() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
|
||||
public void testSecureTwoWaySSL() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final Map<String, String> sslProperties = getKeystoreProperties();
|
||||
sslProperties.putAll(getTruststoreProperties());
|
||||
|
@ -256,7 +249,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSecureOneWaySSL() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, InterruptedException {
|
||||
public void testSecureOneWaySSL() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final Map<String, String> sslProperties = getTruststoreProperties();
|
||||
final ManagedChannel channel = TestGRPCClient.buildChannel(HOST, randPort, sslProperties);
|
||||
|
@ -304,7 +297,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test(expected = io.grpc.StatusRuntimeException.class)
|
||||
public void testSecureTwoWaySSLFailAuthorizedDNCheck() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
|
||||
public void testSecureTwoWaySSLFailAuthorizedDNCheck() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final Map<String, String> sslProperties = getKeystoreProperties();
|
||||
sslProperties.putAll(getTruststoreProperties());
|
||||
|
@ -352,7 +345,7 @@ public class ITListenGRPC {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSecureTwoWaySSLPassAuthorizedDNCheck() throws UnrecoverableKeyException, CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException {
|
||||
public void testSecureTwoWaySSLPassAuthorizedDNCheck() throws Exception {
|
||||
final int randPort = TestGRPCClient.randomPort();
|
||||
final Map<String, String> sslProperties = getKeystoreProperties();
|
||||
sslProperties.putAll(getTruststoreProperties());
|
||||
|
|
|
@ -83,14 +83,14 @@ public class TestGRPCClient {
|
|||
*/
|
||||
public static ManagedChannel buildChannel(final String host, final int port, final Map<String, String> sslProperties)
|
||||
throws NoSuchAlgorithmException, KeyStoreException, IOException, CertificateException, UnrecoverableKeyException {
|
||||
NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port)
|
||||
final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port)
|
||||
.directExecutor()
|
||||
.compressorRegistry(CompressorRegistry.getDefaultInstance())
|
||||
.decompressorRegistry(DecompressorRegistry.getDefaultInstance())
|
||||
.userAgent("testAgent");
|
||||
|
||||
if (sslProperties != null) {
|
||||
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
|
||||
final SslContextBuilder sslContextBuilder = SslContextBuilder.forClient();
|
||||
|
||||
if(sslProperties.get(StandardSSLContextService.KEYSTORE.getName()) != null) {
|
||||
final KeyManagerFactory keyManager = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
|
||||
|
@ -101,7 +101,7 @@ public class TestGRPCClient {
|
|||
keyStore.load(is, keyStorePassword.toCharArray());
|
||||
}
|
||||
keyManager.init(keyStore, keyStorePassword.toCharArray());
|
||||
sslContextBuilder = sslContextBuilder.keyManager(keyManager);
|
||||
sslContextBuilder.keyManager(keyManager);
|
||||
}
|
||||
|
||||
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
|
||||
|
@ -113,17 +113,17 @@ public class TestGRPCClient {
|
|||
trustStore.load(is, trustStorePassword.toCharArray());
|
||||
}
|
||||
trustManagerFactory.init(trustStore);
|
||||
sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory);
|
||||
sslContextBuilder.trustManager(trustManagerFactory);
|
||||
}
|
||||
|
||||
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
|
||||
if (clientAuth == null) {
|
||||
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
|
||||
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
|
||||
} else {
|
||||
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
|
||||
sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
|
||||
}
|
||||
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
|
||||
channelBuilder = channelBuilder.sslContext(sslContextBuilder.build());
|
||||
GrpcSslContexts.configure(sslContextBuilder);
|
||||
channelBuilder.sslContext(sslContextBuilder.build());
|
||||
} else {
|
||||
channelBuilder.usePlaintext();
|
||||
}
|
||||
|
|
|
@ -46,8 +46,8 @@ public class TestGRPCServer<T extends BindableService> {
|
|||
public static final String HOST = "localhost";
|
||||
public static final String NEED_CLIENT_AUTH = "needClientAuth";
|
||||
private final Class<T> clazz;
|
||||
private final Map<String, String> sslProperties;
|
||||
private Server server;
|
||||
private Map<String, String> sslProperties;
|
||||
|
||||
/**
|
||||
* Create a gRPC server
|
||||
|
@ -107,7 +107,7 @@ public class TestGRPCServer<T extends BindableService> {
|
|||
keyStore.load(is, keyStorePassword.toCharArray());
|
||||
}
|
||||
keyManager.init(keyStore, keyStorePassword.toCharArray());
|
||||
SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager);
|
||||
final SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(keyManager);
|
||||
|
||||
if (sslProperties.get(StandardSSLContextService.TRUSTSTORE.getName()) != null) {
|
||||
final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
|
||||
|
@ -118,16 +118,16 @@ public class TestGRPCServer<T extends BindableService> {
|
|||
trustStore.load(is, trustStorePassword.toCharArray());
|
||||
}
|
||||
trustManagerFactory.init(trustStore);
|
||||
sslContextBuilder = sslContextBuilder.trustManager(trustManagerFactory);
|
||||
sslContextBuilder.trustManager(trustManagerFactory);
|
||||
}
|
||||
|
||||
final String clientAuth = sslProperties.get(NEED_CLIENT_AUTH);
|
||||
if (clientAuth == null) {
|
||||
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
|
||||
sslContextBuilder.clientAuth(ClientAuth.REQUIRE);
|
||||
} else {
|
||||
sslContextBuilder = sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
|
||||
sslContextBuilder.clientAuth(ClientAuth.valueOf(clientAuth));
|
||||
}
|
||||
sslContextBuilder = GrpcSslContexts.configure(sslContextBuilder);
|
||||
GrpcSslContexts.configure(sslContextBuilder);
|
||||
nettyServerBuilder.sslContext(sslContextBuilder.build());
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue