diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
index a9e3eb0815..cb3be3854b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-nar/pom.xml
@@ -31,5 +31,10 @@
org.apache.nifi
nifi-kafka-pubsub-processors
+
+ org.apache.nifi
+ nifi-standard-services-api-nar
+ nar
+
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
index 53e309a920..3ad8e37e0f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/pom.xml
@@ -34,6 +34,10 @@
org.apache.nifi
nifi-utils
+
+ org.apache.nifi
+ nifi-ssl-context-service-api
+
org.apache.kafka
kafka-clients
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
index 04f9365149..c2c232137b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessor.java
@@ -43,6 +43,7 @@ import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -115,29 +116,12 @@ abstract class AbstractKafkaProcessor extends AbstractSessi
.expressionLanguageSupported(true)
.build();
- static final PropertyDescriptor SSL_KEY_PASSWORD = new PropertyDescriptor.Builder()
- .name("ssl.key.password")
- .displayName("SSL Key Password")
- .description("The password of the private key in the key store file. Corresponds to Kafka's 'ssl.key.password' property.")
+ static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("ssl.context.service")
+ .displayName("SSL Context Service")
+ .description("Specifies the SSL Context Service to use for communicating with Kafka.")
.required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .sensitive(true)
- .build();
- static final PropertyDescriptor SSL_KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
- .name("ssl.keystore.password")
- .displayName("SSK Keystore Password")
- .description("The store password for the key store file. Corresponds to Kafka's 'ssl.keystore.password' property.")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .sensitive(true)
- .build();
- static final PropertyDescriptor SSL_TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
- .name("ssl.truststore.password")
- .displayName("SSL Truststore Password")
- .description("The password for the trust store file. Corresponds to Kafka's 'ssl.truststore.password' property.")
- .required(false)
- .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .sensitive(true)
+ .identifiesControllerService(SSLContextService.class)
.build();
static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder()
@@ -166,9 +150,7 @@ abstract class AbstractKafkaProcessor extends AbstractSessi
SHARED_DESCRIPTORS.add(CLIENT_ID);
SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
- SHARED_DESCRIPTORS.add(SSL_KEY_PASSWORD);
- SHARED_DESCRIPTORS.add(SSL_KEYSTORE_PASSWORD);
- SHARED_DESCRIPTORS.add(SSL_TRUSTSTORE_PASSWORD);
+ SHARED_DESCRIPTORS.add(SSL_CONTEXT_SERVICE);
SHARED_RELATIONSHIPS.add(REL_SUCCESS);
}
@@ -347,6 +329,13 @@ abstract class AbstractKafkaProcessor extends AbstractSessi
Properties buildKafkaProperties(ProcessContext context) {
Properties properties = new Properties();
for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
+ if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
+ // Translate SSLContext Service configuration into Kafka properties
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ buildSSLKafkaProperties(sslContextService, properties);
+ continue;
+ }
+
String pName = propertyDescriptor.getName();
String pValue = propertyDescriptor.isExpressionLanguageSupported()
? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
@@ -360,4 +349,24 @@ abstract class AbstractKafkaProcessor extends AbstractSessi
}
return properties;
}
+
+ private void buildSSLKafkaProperties(final SSLContextService sslContextService, final Properties properties) {
+ if (sslContextService == null) {
+ return;
+ }
+
+ if (sslContextService.isKeyStoreConfigured()) {
+ properties.setProperty("ssl.keystore.location", sslContextService.getKeyStoreFile());
+ properties.setProperty("ssl.keystore.password", sslContextService.getKeyStorePassword());
+ final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
+ properties.setProperty("ssl.key.password", keyPass);
+ properties.setProperty("ssl.keystore.type", sslContextService.getKeyStoreType());
+ }
+
+ if (sslContextService.isTrustStoreConfigured()) {
+ properties.setProperty("ssl.truststore.location", sslContextService.getTrustStoreFile());
+ properties.setProperty("ssl.truststore.password", sslContextService.getTrustStorePassword());
+ properties.setProperty("ssl.truststore.type", sslContextService.getTrustStoreType());
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index 2bc1cfbe37..f51c064a07 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -242,7 +242,6 @@ public class ConsumeKafka extends AbstractKafkaProcessor