NIFI-2423: Make use of the SSLContextService to provide SSL information

Signed-off-by: Oleg Zhurakousky <oleg@suitcase.io>
This commit is contained in:
Mark Payne 2016-08-03 14:20:29 -04:00 committed by Oleg Zhurakousky
parent c39a127ec8
commit 7ffa30d21b
4 changed files with 43 additions and 26 deletions

View File

@ -31,5 +31,10 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-kafka-pubsub-processors</artifactId> <artifactId>nifi-kafka-pubsub-processors</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -34,6 +34,10 @@
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId> <artifactId>nifi-utils</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>

View File

@ -43,6 +43,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -115,29 +116,12 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor SSL_KEY_PASSWORD = new PropertyDescriptor.Builder() static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.key.password") .name("ssl.context.service")
.displayName("SSL Key Password") .displayName("SSL Context Service")
.description("The password of the private key in the key store file. Corresponds to Kafka's 'ssl.key.password' property.") .description("Specifies the SSL Context Service to use for communicating with Kafka.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR) .identifiesControllerService(SSLContextService.class)
.sensitive(true)
.build();
static final PropertyDescriptor SSL_KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
.name("ssl.keystore.password")
.displayName("SSK Keystore Password")
.description("The store password for the key store file. Corresponds to Kafka's 'ssl.keystore.password' property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor SSL_TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
.name("ssl.truststore.password")
.displayName("SSL Truststore Password")
.description("The password for the trust store file. Corresponds to Kafka's 'ssl.truststore.password' property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build(); .build();
static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder() static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder()
@ -166,9 +150,7 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi
SHARED_DESCRIPTORS.add(CLIENT_ID); SHARED_DESCRIPTORS.add(CLIENT_ID);
SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL); SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE); SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
SHARED_DESCRIPTORS.add(SSL_KEY_PASSWORD); SHARED_DESCRIPTORS.add(SSL_CONTEXT_SERVICE);
SHARED_DESCRIPTORS.add(SSL_KEYSTORE_PASSWORD);
SHARED_DESCRIPTORS.add(SSL_TRUSTSTORE_PASSWORD);
SHARED_RELATIONSHIPS.add(REL_SUCCESS); SHARED_RELATIONSHIPS.add(REL_SUCCESS);
} }
@ -347,6 +329,13 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi
Properties buildKafkaProperties(ProcessContext context) { Properties buildKafkaProperties(ProcessContext context) {
Properties properties = new Properties(); Properties properties = new Properties();
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) { for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
// Translate SSLContext Service configuration into Kafka properties
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
buildSSLKafkaProperties(sslContextService, properties);
continue;
}
String pName = propertyDescriptor.getName(); String pName = propertyDescriptor.getName();
String pValue = propertyDescriptor.isExpressionLanguageSupported() String pValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue() ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
@ -360,4 +349,24 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi
} }
return properties; return properties;
} }
private void buildSSLKafkaProperties(final SSLContextService sslContextService, final Properties properties) {
if (sslContextService == null) {
return;
}
if (sslContextService.isKeyStoreConfigured()) {
properties.setProperty("ssl.keystore.location", sslContextService.getKeyStoreFile());
properties.setProperty("ssl.keystore.password", sslContextService.getKeyStorePassword());
final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
properties.setProperty("ssl.key.password", keyPass);
properties.setProperty("ssl.keystore.type", sslContextService.getKeyStoreType());
}
if (sslContextService.isTrustStoreConfigured()) {
properties.setProperty("ssl.truststore.location", sslContextService.getTrustStoreFile());
properties.setProperty("ssl.truststore.password", sslContextService.getTrustStorePassword());
properties.setProperty("ssl.truststore.type", sslContextService.getTrustStoreType());
}
}
} }

View File

@ -242,7 +242,6 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]
this.checkIfInitialConnectionPossible(); this.checkIfInitialConnectionPossible();
} }
System.out.println(kafkaProperties);
if (!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { if (!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
} }