NIFI-220: Initial Import of GetKafka and PutKafka processors

This commit is contained in:
Mark Payne 2015-01-01 20:52:36 -05:00
parent 68b7ad78cc
commit 3e2f790672
12 changed files with 1103 additions and 0 deletions
assembly
nar-bundles/kafka-bundle
kafka-nar
kafka-processors
pom.xml
src
main
java/org/apache/nifi/processors/kafka
resources
META-INF/services
docs
org.apache.nifi.processors.kafka.GetKafka
org.apache.nifi.processors.kafka.PutKafka
test/java/org/apache/nifi/processors/kafka
pom.xml
pom.xml

View File

@ -203,6 +203,11 @@
<artifactId>hadoop-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>kafka-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies>
<properties>

View File

@ -0,0 +1,37 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>kafka-bundle</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>kafka-nar</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>NiFi Kafka NAR</name>
<packaging>nar</packaging>
<description>NiFi NAR for interacting with Apache Kafka</description>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>kafka-processors</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,76 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>kafka-bundle</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-processors</artifactId>
<packaging>jar</packaging>
<name>kafka-processors</name>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<!-- Transitive dependencies excluded because they are located
in a legacy Maven repository, which Maven 3 doesn't support. -->
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,259 @@
package org.apache.nifi.processors.kafka;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.annotation.CapabilityDescription;
import org.apache.nifi.processor.annotation.OnScheduled;
import org.apache.nifi.processor.annotation.OnStopped;
import org.apache.nifi.processor.annotation.OnUnscheduled;
import org.apache.nifi.processor.annotation.SupportsBatching;
import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@CapabilityDescription("Fetches messages from Apache Kafka")
@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
public class GetKafka extends AbstractProcessor {
public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
.name("ZooKeeper Connection String")
.description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name")
.description("The Kafka Topic to pull messages from")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
.name("Zookeeper Commit Frequency")
.description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("60 secs")
.build();
public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder()
.name("ZooKeeper Communications Timeout")
.description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("30 secs")
.build();
public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder()
.name("Kafka Communications Timeout")
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("30 secs")
.build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles that are created are routed to this relationship")
.build();
private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
private volatile ConsumerConnector consumer;
final Lock interruptionLock = new ReentrantLock();
// guarded by interruptionLock
private final Set<Thread> interruptableThreads = new HashSet<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier())
.build();
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(ZOOKEEPER_CONNECTION_STRING);
props.add(TOPIC);
props.add(ZOOKEEPER_COMMIT_DELAY);
props.add(clientNameWithDefault);
props.add(KAFKA_TIMEOUT);
props.add(ZOOKEEPER_TIMEOUT);
return props;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>(1);
relationships.add(REL_SUCCESS);
return relationships;
}
@OnScheduled
public void createConsumers(final ProcessContext context) {
final String topic = context.getProperty(TOPIC).getValue();
final Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, context.getMaxConcurrentTasks());
final Properties props = new Properties();
props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
props.setProperty("group.id", getIdentifier());
props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
props.setProperty("auto.commit.enable", "true"); // just be explicit
props.setProperty("auto.offset.reset", "smallest");
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
this.streamIterators.clear();
for ( final KafkaStream<byte[], byte[]> stream : streams ) {
streamIterators.add(stream.iterator());
}
}
@OnStopped
public void shutdownConsumer() {
if ( consumer != null ) {
try {
consumer.commitOffsets();
} finally {
consumer.shutdown();
}
}
}
@OnUnscheduled
public void interruptIterators() {
// Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
// interrupt the Threads. We do this when the Processor is stopped so that we have the
// ability to shutdown the Processor.
interruptionLock.lock();
try {
for ( final Thread t : interruptableThreads ) {
t.interrupt();
}
interruptableThreads.clear();
} finally {
interruptionLock.unlock();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
ConsumerIterator<byte[], byte[]> iterator = streamIterators.poll();
if ( iterator == null ) {
return;
}
FlowFile flowFile = null;
try {
interruptionLock.lock();
try {
interruptableThreads.add(Thread.currentThread());
} finally {
interruptionLock.unlock();
}
try {
if (!iterator.hasNext() ) {
return;
}
} catch (final Exception e) {
getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
iterator = null;
return;
}
final long start = System.nanoTime();
final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
if ( mam == null ) {
return;
}
final byte[] key = mam.key();
final Map<String, String> attributes = new HashMap<>();
if ( key != null ) {
attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
}
attributes.put("kafka.offset", String.valueOf(mam.offset()));
attributes.put("kafka.partition", String.valueOf(mam.partition()));
attributes.put("kafka.topic", mam.topic());
flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
out.write(mam.message());
}
});
flowFile = session.putAllAttributes(flowFile, attributes);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) {
getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
if ( flowFile != null ) {
session.remove(flowFile);
}
} finally {
interruptionLock.lock();
try {
interruptableThreads.remove(Thread.currentThread());
} finally {
interruptionLock.unlock();
}
if ( iterator != null ) {
streamIterators.offer(iterator);
}
}
}
}

View File

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.annotation.CapabilityDescription;
import org.apache.nifi.processor.annotation.OnStopped;
import org.apache.nifi.processor.annotation.SupportsBatching;
import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.StreamUtils;
@SupportsBatching
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor {
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name")
.description("The Kafka Topic of interest")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Kafka Key")
.description("The Key to use for the Message")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
.required(true)
.expressionLanguageSupported(false)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("30 secs")
.build();
public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
.name("Max FlowFile Size")
.description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build();
public static final Relationship REL_REJECT = new Relationship.Builder()
.name("reject")
.description("Any FlowFile whose size exceeds the <Max FlowFile Size> property will be routed to this Relationship")
.build();
private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(CLIENT_NAME)
.defaultValue("NiFi-" + getIdentifier())
.build();
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEED_BROKERS);
props.add(TOPIC);
props.add(KEY);
props.add(DELIVERY_GUARANTEE);
props.add(TIMEOUT);
props.add(MAX_FLOWFILE_SIZE);
props.add(clientName);
return props;
}
@Override
public Set<Relationship> getRelationships() {
final Set<Relationship> relationships = new HashSet<>(1);
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
relationships.add(REL_REJECT);
return relationships;
}
@OnStopped
public void closeProducers() {
Producer<byte[], byte[]> producer;
while ((producer = producers.poll()) != null) {
producer.close();
}
}
private Producer<byte[], byte[]> createProducer(final ProcessContext context) {
final String brokers = context.getProperty(SEED_BROKERS).getValue();
final Properties properties = new Properties();
properties.setProperty("metadata.broker.list", brokers);
properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", "sync");
final ProducerConfig config = new ProducerConfig(properties);
return new Producer<>(config);
}
private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
Producer<byte[], byte[]> producer = producers.poll();
return producer == null ? createProducer(context) : producer;
}
private void returnProducer(final Producer<byte[], byte[]> producer) {
producers.offer(producer);
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
final long start = System.nanoTime();
final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
if ( flowFile.getSize() > maxSize ) {
getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
session.transfer(flowFile, REL_REJECT);
return;
}
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] value = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, value);
}
});
final Producer<byte[], byte[]> producer = borrowProducer(context);
boolean error = false;
try {
final KeyedMessage<byte[], byte[]> message;
if ( key == null ) {
message = new KeyedMessage<>(topic, value);
} else {
message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
}
producer.send(message);
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
} catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
error = true;
} finally {
if ( error ) {
producer.close();
} else {
returnProducer(producer);
}
}
}
}

View File

@ -0,0 +1,2 @@
org.apache.nifi.processors.kafka.GetKafka
org.apache.nifi.processors.kafka.PutKafka

View File

@ -0,0 +1,147 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>GetKafka</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a>
for data. When a message is received from Kafka, this Processor emits a FlowFile
where the content of the FlowFile is the value of the Kafka message. If the
message has a key associated with it, an attribute named <code>kafka.key</code>
will be added to the FlowFile, with the value being the UTF-8 Encoded value
of the Message's Key.
</p>
<p>
Kafka supports the notion of a Consumer Group when pulling messages in order to
provide scalability while still offering a publish-subscribe interface. Each
Consumer Group must have a unique identifier. The Consumer Group identifier that
is used by NiFi is the UUID of the Processor. This means that all of the nodes
within a cluster will use the same Consumer Group Identifier so that they do
not receive duplicate data but multiple GetKafka Processors can be used to pull
from multiple Topics, as each Processor will receive a different Processor UUID
and therefore a different Consumer Group Identifier.
</p>
<p>
<strong>Modifies Attributes:</strong>
</p>
<table border="1">
<thead>
<tr>
<th>Attribute Name</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>kafka.key</td>
<td>The key of the Kafka message, if it exists. If the message does not have a key,
this attribute will not be added.</td>
</tr>
<tr>
<td>kafka.topic</td>
<td>The name of the Kafka Topic from which the message was received</td>
</tr>
<tr>
<td>kafka.partition</td>
<td>The partition of the Kafka Topic from which the message was received</td>
</tr>
<tr>
<td>kafka.offset</td>
<td>The offset of the message within the Kafka partition</td>
</tr>
</tbody>
</table>
<p>
<strong>Properties:</strong>
</p>
<p>In the list below, the names of required properties appear
in bold. Any other properties (not in bold) are considered optional.
If a property has a default value, it is indicated. If a property
supports the use of the NiFi Expression Language (or simply,
"expression language"), that is also indicated.</p>
<ul>
<li><strong>ZooKeeper Connection String</strong>
<ul>
<li>The Connection String to use in order to connect to ZooKeeper. This is often a
comma-separated list of &lt;host&gt;:&lt;port&gt; combinations. For example,
host1:2181,host2:2181,host3:2188</li>
<li>Default value: no default</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Topic Name</strong>
<ul>
<li>The Kafka Topic to pull messages from</li>
<li>Default value: no default</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Zookeeper Commit Frequency</strong>
<ul>
<li>Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled.
A longer time period will result in better overall performance but can result in more data
duplication if a NiFi node is lost
</li>
<li>Default value: 60 secs</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>ZooKeeper Communications Timeout</strong>
<ul>
<li>The amount of time to wait for a response from ZooKeeper before determining that there is a communications error</li>
<li>Default value: 30 secs</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Kafka Communications Timeout</strong>
<ul>
<li>The amount of time to wait for a response from Kafka before determining that there is a communications error</li>
<li>Default value: 30 secs</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Client Name</strong>
<ul>
<li>Client Name to use when communicating with Kafka</li>
<li>Default value: "NiFi-" followed by the UUID of the Processor</li>
<li>Supports expression language: false</li>
</ul>
</li>
</ul>
<p>
<strong>Relationships:</strong>
</p>
<ul>
<li>success
<ul>
<li>All messages that are received from Kafka are routed to the 'success' relationship</li>
</ul>
</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,177 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>PutKafka</title>
<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
</head>
<body>
<!-- Processor Documentation ================================================== -->
<h2>Description:</h2>
<p>
This Processors puts the contents of a FlowFile to a Topic in
<a href="http://kafka.apache.org/">Apache Kafka</a>. The full contents of
a FlowFile becomes the contents of a single message in Kafka.
This message is optionally assigned a key by using the
&lt;Kafka Key&gt; Property.
</p>
<p>
<strong>Properties:</strong>
</p>
<p>In the list below, the names of required properties appear
in bold. Any other properties (not in bold) are considered optional.
If a property has a default value, it is indicated. If a property
supports the use of the NiFi Expression Language (or simply,
"expression language"), that is also indicated.</p>
<ul>
<li><strong>Known Brokers</strong>
<ul>
<li>
A comma-separated list of known Kafka Brokers in the format
&lgt;host&gt;:&lt;port&gt;. This list does not need to be
exhaustive but provides a mechanism for determining which
other nodes belong to the Kafka cluster.
</li>
<li>Default value: no default</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Topic Name</strong>
<ul>
<li>The Kafka Topic to send messages to. While the GetKafka
Processor requires a statically named Topic so that it knows
where to fetch messages from, the PutKafka Processor does allow
the Expression Language to be used so that a single PutKafka
Processor can be used to send messages to many different Kafka
topics.
</li>
<li>Default value: no default</li>
<li>Supports expression language: true</li>
</ul>
</li>
<li>Kafka Key
<ul>
<li>
The Key to use for the Message. If no value is given, the message
will not be given a Key.
</li>
<li>Default value: no default</li>
<li>Supports expression language: true</li>
</ul>
</li>
<li><strong>Delivery Guarantee</strong>
<ul>
<li>
Specifies the requirement for guaranteeing that a message is sent to Kafka.
This Property can have one of three different values:
<ul>
<li>
<b>Guarantee Replicated Delivery</b> - FlowFile will be routed to
failure unless the message is replicated to the appropriate number
of Kafka Nodes according to the Topic configuration
</li>
<li>
<b>Guarantee Single Node Delivery</b> - FlowFile will be routed to
success if the message is received by a single Kafka node,
whether or not it is replicated. This is faster than
&lt;Guarantee Replicated Delivery&gt; but can result in data loss
if a Kafka node crashes
</li>
<li>
<b>Best Effort</b> - FlowFile will be routed to success after successfully
writing the content to a Kafka node, without waiting for a response.
This provides the best performance but may result in data loss.
</li>
</ul>
</li>
<li>Default value: Best Effort</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Communications Timeout</strong>
<ul>
<li>
The amount of time to wait for a response from Kafka before determining
that there is a communications error
</li>
<li>Default value: 30 secs</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Max FlowFile Size</strong>
<ul>
<li>
Specifies the amount of data that can be buffered to send to Kafka. Because
the contents of the FlowFile must be buffered into memory before they can
be sent to Kafka, attempting to send a very large FlowFile can cause
problems by causing the machine to run out of memory.
This helps to prevent the system from running out of memory, the PutKafka
Processor exposes a property for specifying the maximum size of a FlowFile.
If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
</li>
<li>Default value: 1 MB</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Client Name</strong>
<ul>
<li>Client Name to use when communicating with Kafka</li>
<li>Default value: "NiFi-" followed by the UUID of the Processor</li>
<li>Supports expression language: false</li>
</ul>
</li>
</ul>
<p>
<strong>Relationships:</strong>
</p>
<ul>
<li>success
<ul>
<li>All FlowFiles that are successfully sent to Kafka are routed
to this relationship.
</li>
</ul>
</li>
<li>reject
<ul>
<li>Any FlowFile whose content size exceeds the configured value for
the &lt;Max FlowFile Size&gt; property will be routed to this
relationship.
</li>
</ul>
</li>
<li>failure
<ul>
<li>All FlowFiles that cannot be sent to Kafka for any reason other
than their content size exceeding the value of the &lt;Max FlowFile
Size&gt; property will be routed to this relationship.
</li>
</ul>
</li>
</ul>
</body>
</html>

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka;
import java.util.List;
import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("Intended only for local tests to verify functionality.")
public class TestGetKafka {
public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
@BeforeClass
public static void configureLogging() {
System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka", "INFO");
BasicConfigurator.configure();
}
@Test
public void testIntegrationLocally() {
final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
runner.run(20, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
for ( final MockFlowFile flowFile : flowFiles ) {
System.out.println(flowFile.getAttributes());
System.out.println(new String(flowFile.toByteArray()));
System.out.println();
}
}
}

View File

@ -0,0 +1,48 @@
package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("Intended only for local testing to verify functionality.")
public class TestPutKafka {
@Test
public void testKeyValuePut() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", "test");
attributes.put("kafka.key", "key3");
final byte[] data = "Hello, World, Again! ;)".getBytes();
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.run(5);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
final MockFlowFile mff = mffs.get(0);
assertTrue(Arrays.equals(data, mff.toByteArray()));
}
}

View File

@ -0,0 +1,35 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nar-bundle-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-bundle</artifactId>
<packaging>pom</packaging>
<name>kafka-bundle</name>
<modules>
<module>kafka-processors</module>
<module>kafka-nar</module>
</modules>
</project>

View File

@ -766,6 +766,12 @@
<version>${project.version}</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>kafka-nar</artifactId>
<version>${project.version}</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>