NIFI-2322, NIFI-2423, NIFI-2412 Kafka improvements

- Fixed KafkaConsumer's connection block when broker is not available
- Fixed Serializer/Deserializer configs in both Consume/Publish Kafka
- Added sensitive properties for SSL ket/trust stores

NIFI-2322 fixed tests
This commit is contained in:
Oleg Zhurakousky 2016-07-29 09:06:47 -04:00
parent 8d380dcdac
commit c39a127ec8
4 changed files with 86 additions and 5 deletions

View File

@ -115,6 +115,31 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor SSL_KEY_PASSWORD = new PropertyDescriptor.Builder()
.name("ssl.key.password")
.displayName("SSL Key Password")
.description("The password of the private key in the key store file. Corresponds to Kafka's 'ssl.key.password' property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor SSL_KEYSTORE_PASSWORD = new PropertyDescriptor.Builder()
.name("ssl.keystore.password")
.displayName("SSK Keystore Password")
.description("The store password for the key store file. Corresponds to Kafka's 'ssl.keystore.password' property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build();
static final PropertyDescriptor SSL_TRUSTSTORE_PASSWORD = new PropertyDescriptor.Builder()
.name("ssl.truststore.password")
.displayName("SSL Truststore Password")
.description("The password for the trust store file. Corresponds to Kafka's 'ssl.truststore.password' property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.sensitive(true)
.build();
static final Builder MESSAGE_DEMARCATOR_BUILDER = new PropertyDescriptor.Builder()
.name("message-demarcator")
.displayName("Message Demarcator")
@ -141,6 +166,10 @@ abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessi
SHARED_DESCRIPTORS.add(CLIENT_ID);
SHARED_DESCRIPTORS.add(SECURITY_PROTOCOL);
SHARED_DESCRIPTORS.add(KERBEROS_PRINCIPLE);
SHARED_DESCRIPTORS.add(SSL_KEY_PASSWORD);
SHARED_DESCRIPTORS.add(SSL_KEYSTORE_PASSWORD);
SHARED_DESCRIPTORS.add(SSL_TRUSTSTORE_PASSWORD);
SHARED_RELATIONSHIPS.add(REL_SUCCESS);
}

View File

@ -18,6 +18,8 @@ package org.apache.nifi.processors.kafka.pubsub;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@ -48,6 +50,7 @@ import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@ -228,16 +231,59 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]
: null;
this.topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
Properties kafkaProperties = this.buildKafkaProperties(context);
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
/*
* Since we are using unconventional way to validate if connectivity to
* broker is possible we need a mechanism to be able to disable it.
* 'check.connection' property will serve as such mechanism
*/
if (!kafkaProperties.getProperty("check.connection").equals("false")) {
this.checkIfInitialConnectionPossible();
}
System.out.println(kafkaProperties);
if (!kafkaProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) {
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
if (!kafkaProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) {
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
}
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(kafkaProperties);
consumer.subscribe(Collections.singletonList(this.topic));
return consumer;
}
/**
* Checks via brute force if it is possible to establish connection to at
* least one broker. If not this method will throw {@link ProcessException}.
*/
private void checkIfInitialConnectionPossible(){
String[] br = this.brokers.split(",");
boolean connectionPossible = false;
for (int i = 0; i < br.length && !connectionPossible; i++) {
String hostPortCombo = br[i];
String[] hostPort = hostPortCombo.split(":");
Socket client = null;
try {
client = new Socket();
client.connect(new InetSocketAddress(hostPort[0].trim(), Integer.parseInt(hostPort[1].trim())), 10000);
connectionPossible = true;
} catch (Exception e) {
this.logger.error("Connection to '" + hostPortCombo + "' is not possible", e);
} finally {
try {
client.close();
} catch (IOException e) {
// ignore
}
}
}
if (!connectionPossible){
throw new ProcessException("Connection to " + this.brokers + " is not possible. See logs for more details");
}
}
/**
* Will release flow file. Releasing of the flow file in the context of this
* operation implies the following:

View File

@ -225,8 +225,12 @@ public class PublishKafka extends AbstractKafkaProcessor<KafkaPublisher> {
@Override
protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) {
Properties kafkaProperties = this.buildKafkaProperties(context);
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
if (!kafkaProperties.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) {
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
}
if (!kafkaProperties.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) {
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
}
this.brokers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, this.getLogger());
return publisher;

View File

@ -84,6 +84,7 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty("check.connection", "false");
byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8),
"Hello-2".getBytes(StandardCharsets.UTF_8), "Hello-3".getBytes(StandardCharsets.UTF_8) };
@ -130,6 +131,7 @@ public class ConsumeKafkaTest {
runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
runner.setProperty("check.connection", "false");
byte[][] values = new byte[][] { "Hello-1".getBytes(StandardCharsets.UTF_8),
"Hi-2".getBytes(StandardCharsets.UTF_8) };