diff --git a/assembly/pom.xml b/assembly/pom.xml
index ae74485deb..0d00da3ffb 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -203,6 +203,11 @@
hadoop-narnar
+
+ org.apache.nifi
+ kafka-nar
+ nar
+
diff --git a/nar-bundles/kafka-bundle/kafka-nar/pom.xml b/nar-bundles/kafka-bundle/kafka-nar/pom.xml
new file mode 100644
index 0000000000..8dfefddedd
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-nar/pom.xml
@@ -0,0 +1,37 @@
+
+
+ 4.0.0
+
+
+ org.apache.nifi
+ kafka-bundle
+ 0.0.1-SNAPSHOT
+
+
+ kafka-nar
+ 0.0.1-SNAPSHOT
+ NiFi Kafka NAR
+ nar
+ NiFi NAR for interacting with Apache Kafka
+
+
+
+ org.apache.nifi
+ kafka-processors
+ ${project.version}
+
+
+
diff --git a/nar-bundles/kafka-bundle/kafka-processors/pom.xml b/nar-bundles/kafka-bundle/kafka-processors/pom.xml
new file mode 100644
index 0000000000..8cad323545
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/pom.xml
@@ -0,0 +1,76 @@
+
+
+
+
+ org.apache.nifi
+ kafka-bundle
+ 0.0.1-SNAPSHOT
+
+
+ 4.0.0
+
+ kafka-processors
+ jar
+
+ kafka-processors
+
+
+
+ org.apache.nifi
+ nifi-api
+
+
+ org.apache.nifi
+ nifi-processor-utils
+
+
+ org.apache.nifi
+ nifi-utils
+
+
+ org.apache.kafka
+ kafka_2.8.2
+ 0.8.1
+
+
+
+ javax.jms
+ jms
+
+
+ com.sun.jdmk
+ jmxtools
+
+
+ com.sun.jmx
+ jmxri
+
+
+
+
+
+ org.apache.nifi
+ nifi-mock
+ test
+
+
+ org.slf4j
+ slf4j-simple
+ test
+
+
+
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
new file mode 100644
index 0000000000..55c67e36d6
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -0,0 +1,259 @@
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.OnStopped;
+import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.processor.annotation.SupportsBatching;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@SupportsBatching
+@CapabilityDescription("Fetches messages from Apache Kafka")
+@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
+public class GetKafka extends AbstractProcessor {
+ public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
+ .name("ZooKeeper Connection String")
+ .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of : combinations. For example, host1:2181,host2:2181,host3:2188")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+ .name("Topic Name")
+ .description("The Kafka Topic to pull messages from")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
+ .name("Zookeeper Commit Frequency")
+ .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("60 secs")
+ .build();
+ public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("ZooKeeper Communications Timeout")
+ .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("30 secs")
+ .build();
+ public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Kafka Communications Timeout")
+ .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("30 secs")
+ .build();
+ public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+ .name("Client Name")
+ .description("Client Name to use when communicating with Kafka")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("All FlowFiles that are created are routed to this relationship")
+ .build();
+
+
+ private final BlockingQueue> streamIterators = new LinkedBlockingQueue<>();
+ private volatile ConsumerConnector consumer;
+
+ final Lock interruptionLock = new ReentrantLock();
+ // guarded by interruptionLock
+ private final Set interruptableThreads = new HashSet<>();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CLIENT_NAME)
+ .defaultValue("NiFi-" + getIdentifier())
+ .build();
+
+ final List props = new ArrayList<>();
+ props.add(ZOOKEEPER_CONNECTION_STRING);
+ props.add(TOPIC);
+ props.add(ZOOKEEPER_COMMIT_DELAY);
+ props.add(clientNameWithDefault);
+ props.add(KAFKA_TIMEOUT);
+ props.add(ZOOKEEPER_TIMEOUT);
+ return props;
+ }
+
+ @Override
+ public Set getRelationships() {
+ final Set relationships = new HashSet<>(1);
+ relationships.add(REL_SUCCESS);
+ return relationships;
+ }
+
+ @OnScheduled
+ public void createConsumers(final ProcessContext context) {
+ final String topic = context.getProperty(TOPIC).getValue();
+
+ final Map topicCountMap = new HashMap<>(1);
+ topicCountMap.put(topic, context.getMaxConcurrentTasks());
+
+ final Properties props = new Properties();
+ props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
+ props.setProperty("group.id", getIdentifier());
+ props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+ props.setProperty("auto.commit.enable", "true"); // just be explicit
+ props.setProperty("auto.offset.reset", "smallest");
+
+ final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+ consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+
+ final Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
+ final List> streams = consumerMap.get(topic);
+
+ this.streamIterators.clear();
+
+ for ( final KafkaStream stream : streams ) {
+ streamIterators.add(stream.iterator());
+ }
+ }
+
+ @OnStopped
+ public void shutdownConsumer() {
+ if ( consumer != null ) {
+ try {
+ consumer.commitOffsets();
+ } finally {
+ consumer.shutdown();
+ }
+ }
+ }
+
+ @OnUnscheduled
+ public void interruptIterators() {
+ // Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
+ // interrupt the Threads. We do this when the Processor is stopped so that we have the
+ // ability to shutdown the Processor.
+ interruptionLock.lock();
+ try {
+ for ( final Thread t : interruptableThreads ) {
+ t.interrupt();
+ }
+
+ interruptableThreads.clear();
+ } finally {
+ interruptionLock.unlock();
+ }
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ ConsumerIterator iterator = streamIterators.poll();
+ if ( iterator == null ) {
+ return;
+ }
+
+ FlowFile flowFile = null;
+ try {
+ interruptionLock.lock();
+ try {
+ interruptableThreads.add(Thread.currentThread());
+ } finally {
+ interruptionLock.unlock();
+ }
+
+ try {
+ if (!iterator.hasNext() ) {
+ return;
+ }
+ } catch (final Exception e) {
+ getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
+ iterator = null;
+ return;
+ }
+
+ final long start = System.nanoTime();
+ final MessageAndMetadata mam = iterator.next();
+
+ if ( mam == null ) {
+ return;
+ }
+
+ final byte[] key = mam.key();
+
+ final Map attributes = new HashMap<>();
+ if ( key != null ) {
+ attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+ }
+ attributes.put("kafka.offset", String.valueOf(mam.offset()));
+ attributes.put("kafka.partition", String.valueOf(mam.partition()));
+ attributes.put("kafka.topic", mam.topic());
+
+ flowFile = session.create();
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ out.write(mam.message());
+ }
+ });
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
+ getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final Exception e) {
+ getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
+ if ( flowFile != null ) {
+ session.remove(flowFile);
+ }
+ } finally {
+ interruptionLock.lock();
+ try {
+ interruptableThreads.remove(Thread.currentThread());
+ } finally {
+ interruptionLock.unlock();
+ }
+
+ if ( iterator != null ) {
+ streamIterators.offer(iterator);
+ }
+ }
+ }
+
+}
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
new file mode 100644
index 0000000000..5e5940cdfa
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnStopped;
+import org.apache.nifi.processor.annotation.SupportsBatching;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
+public class PutKafka extends AbstractProcessor {
+ private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+ private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+
+ public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
+ public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than but can result in data loss if a Kafka node crashes");
+ public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
+
+ public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
+ .name("Known Brokers")
+ .description("A comma-separated list of known Kafka Brokers in the format :")
+ .required(true)
+ .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+ .expressionLanguageSupported(false)
+ .build();
+ public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+ .name("Topic Name")
+ .description("The Kafka Topic of interest")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+ .name("Kafka Key")
+ .description("The Key to use for the Message")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+ .name("Delivery Guarantee")
+ .description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+ .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+ .build();
+ public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("30 secs")
+ .build();
+ public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
+ .name("Max FlowFile Size")
+ .description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("1 MB")
+ .build();
+ public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+ .name("Client Name")
+ .description("Client Name to use when communicating with Kafka")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+ .build();
+ public static final Relationship REL_REJECT = new Relationship.Builder()
+ .name("reject")
+ .description("Any FlowFile whose size exceeds the property will be routed to this Relationship")
+ .build();
+
+ private final BlockingQueue> producers = new LinkedBlockingQueue<>();
+
+ @Override
+ protected List getSupportedPropertyDescriptors() {
+ final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
+ .fromPropertyDescriptor(CLIENT_NAME)
+ .defaultValue("NiFi-" + getIdentifier())
+ .build();
+
+ final List props = new ArrayList<>();
+ props.add(SEED_BROKERS);
+ props.add(TOPIC);
+ props.add(KEY);
+ props.add(DELIVERY_GUARANTEE);
+ props.add(TIMEOUT);
+ props.add(MAX_FLOWFILE_SIZE);
+ props.add(clientName);
+ return props;
+ }
+
+ @Override
+ public Set getRelationships() {
+ final Set relationships = new HashSet<>(1);
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.add(REL_REJECT);
+ return relationships;
+ }
+
+
+ @OnStopped
+ public void closeProducers() {
+ Producer producer;
+
+ while ((producer = producers.poll()) != null) {
+ producer.close();
+ }
+ }
+
+
+ private Producer createProducer(final ProcessContext context) {
+ final String brokers = context.getProperty(SEED_BROKERS).getValue();
+
+ final Properties properties = new Properties();
+ properties.setProperty("metadata.broker.list", brokers);
+ properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
+ properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
+ properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
+
+ properties.setProperty("message.send.max.retries", "1");
+ properties.setProperty("producer.type", "sync");
+
+ final ProducerConfig config = new ProducerConfig(properties);
+ return new Producer<>(config);
+ }
+
+ private Producer borrowProducer(final ProcessContext context) {
+ Producer producer = producers.poll();
+ return producer == null ? createProducer(context) : producer;
+ }
+
+ private void returnProducer(final Producer producer) {
+ producers.offer(producer);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ final long start = System.nanoTime();
+ final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
+ if ( flowFile.getSize() > maxSize ) {
+ getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
+ session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
+ session.transfer(flowFile, REL_REJECT);
+ return;
+ }
+
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+ final byte[] value = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, value);
+ }
+ });
+
+ final Producer producer = borrowProducer(context);
+ boolean error = false;
+ try {
+ final KeyedMessage message;
+ if ( key == null ) {
+ message = new KeyedMessage<>(topic, value);
+ } else {
+ message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
+ }
+
+ producer.send(message);
+ final long nanos = System.nanoTime() - start;
+
+ session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+ } catch (final Exception e) {
+ getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ error = true;
+ } finally {
+ if ( error ) {
+ producer.close();
+ } else {
+ returnProducer(producer);
+ }
+ }
+ }
+
+}
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000000..6ae3da1885
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,2 @@
+org.apache.nifi.processors.kafka.GetKafka
+org.apache.nifi.processors.kafka.PutKafka
\ No newline at end of file
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
new file mode 100644
index 0000000000..d429d6b5eb
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
@@ -0,0 +1,147 @@
+
+
+
+
+
+ GetKafka
+
+
+
+
+
+
Description:
+
+ This Processors polls Apache Kafka
+ for data. When a message is received from Kafka, this Processor emits a FlowFile
+ where the content of the FlowFile is the value of the Kafka message. If the
+ message has a key associated with it, an attribute named kafka.key
+ will be added to the FlowFile, with the value being the UTF-8 Encoded value
+ of the Message's Key.
+
+
+ Kafka supports the notion of a Consumer Group when pulling messages in order to
+ provide scalability while still offering a publish-subscribe interface. Each
+ Consumer Group must have a unique identifier. The Consumer Group identifier that
+ is used by NiFi is the UUID of the Processor. This means that all of the nodes
+ within a cluster will use the same Consumer Group Identifier so that they do
+ not receive duplicate data but multiple GetKafka Processors can be used to pull
+ from multiple Topics, as each Processor will receive a different Processor UUID
+ and therefore a different Consumer Group Identifier.
+
+
+
+ Modifies Attributes:
+
+
+
+
+
Attribute Name
+
Description
+
+
+
+
+
kafka.key
+
The key of the Kafka message, if it exists. If the message does not have a key,
+ this attribute will not be added.
+
+
+
kafka.topic
+
The name of the Kafka Topic from which the message was received
+
+
+
kafka.partition
+
The partition of the Kafka Topic from which the message was received
+
+
+
kafka.offset
+
The offset of the message within the Kafka partition
+
+
+
+
+
+
+ Properties:
+
+
In the list below, the names of required properties appear
+ in bold. Any other properties (not in bold) are considered optional.
+ If a property has a default value, it is indicated. If a property
+ supports the use of the NiFi Expression Language (or simply,
+ "expression language"), that is also indicated.
+
+
ZooKeeper Connection String
+
+
The Connection String to use in order to connect to ZooKeeper. This is often a
+ comma-separated list of <host>:<port> combinations. For example,
+ host1:2181,host2:2181,host3:2188
+
Default value: no default
+
Supports expression language: false
+
+
+
Topic Name
+
+
The Kafka Topic to pull messages from
+
Default value: no default
+
Supports expression language: false
+
+
+
Zookeeper Commit Frequency
+
+
Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled.
+ A longer time period will result in better overall performance but can result in more data
+ duplication if a NiFi node is lost
+
+
Default value: 60 secs
+
Supports expression language: false
+
+
+
ZooKeeper Communications Timeout
+
+
The amount of time to wait for a response from ZooKeeper before determining that there is a communications error
+
Default value: 30 secs
+
Supports expression language: false
+
+
+
Kafka Communications Timeout
+
+
The amount of time to wait for a response from Kafka before determining that there is a communications error
+
Default value: 30 secs
+
Supports expression language: false
+
+
+
Client Name
+
+
Client Name to use when communicating with Kafka
+
Default value: "NiFi-" followed by the UUID of the Processor
+
Supports expression language: false
+
+
+
+
+
+ Relationships:
+
+
+
success
+
+
All messages that are received from Kafka are routed to the 'success' relationship
+ This Processors puts the contents of a FlowFile to a Topic in
+ Apache Kafka. The full contents of
+ a FlowFile becomes the contents of a single message in Kafka.
+ This message is optionally assigned a key by using the
+ <Kafka Key> Property.
+
+
+
+
+ Properties:
+
+
In the list below, the names of required properties appear
+ in bold. Any other properties (not in bold) are considered optional.
+ If a property has a default value, it is indicated. If a property
+ supports the use of the NiFi Expression Language (or simply,
+ "expression language"), that is also indicated.
+
+
Known Brokers
+
+
+ A comma-separated list of known Kafka Brokers in the format
+ &lgt;host>:<port>. This list does not need to be
+ exhaustive but provides a mechanism for determining which
+ other nodes belong to the Kafka cluster.
+
+
Default value: no default
+
Supports expression language: false
+
+
+
Topic Name
+
+
The Kafka Topic to send messages to. While the GetKafka
+ Processor requires a statically named Topic so that it knows
+ where to fetch messages from, the PutKafka Processor does allow
+ the Expression Language to be used so that a single PutKafka
+ Processor can be used to send messages to many different Kafka
+ topics.
+
+
Default value: no default
+
Supports expression language: true
+
+
+
+
Kafka Key
+
+
+ The Key to use for the Message. If no value is given, the message
+ will not be given a Key.
+
+
Default value: no default
+
Supports expression language: true
+
+
+
Delivery Guarantee
+
+
+ Specifies the requirement for guaranteeing that a message is sent to Kafka.
+ This Property can have one of three different values:
+
+
+ Guarantee Replicated Delivery - FlowFile will be routed to
+ failure unless the message is replicated to the appropriate number
+ of Kafka Nodes according to the Topic configuration
+
+
+ Guarantee Single Node Delivery - FlowFile will be routed to
+ success if the message is received by a single Kafka node,
+ whether or not it is replicated. This is faster than
+ <Guarantee Replicated Delivery> but can result in data loss
+ if a Kafka node crashes
+
+
+ Best Effort - FlowFile will be routed to success after successfully
+ writing the content to a Kafka node, without waiting for a response.
+ This provides the best performance but may result in data loss.
+
+
+
+
Default value: Best Effort
+
Supports expression language: false
+
+
+
Communications Timeout
+
+
+ The amount of time to wait for a response from Kafka before determining
+ that there is a communications error
+
+
Default value: 30 secs
+
Supports expression language: false
+
+
+
Max FlowFile Size
+
+
+ Specifies the amount of data that can be buffered to send to Kafka. Because
+ the contents of the FlowFile must be buffered into memory before they can
+ be sent to Kafka, attempting to send a very large FlowFile can cause
+ problems by causing the machine to run out of memory.
+ This helps to prevent the system from running out of memory, the PutKafka
+ Processor exposes a property for specifying the maximum size of a FlowFile.
+ If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
+
+
Default value: 1 MB
+
Supports expression language: false
+
+
+
Client Name
+
+
Client Name to use when communicating with Kafka
+
Default value: "NiFi-" followed by the UUID of the Processor
+
Supports expression language: false
+
+
+
+
+
+
+ Relationships:
+
+
+
success
+
+
All FlowFiles that are successfully sent to Kafka are routed
+ to this relationship.
+
+
+
+
+
reject
+
+
Any FlowFile whose content size exceeds the configured value for
+ the <Max FlowFile Size> property will be routed to this
+ relationship.
+
+
+
+
+
failure
+
+
All FlowFiles that cannot be sent to Kafka for any reason other
+ than their content size exceeding the value of the <Max FlowFile
+ Size> property will be routed to this relationship.
+
+
+
+
+
+
+
+
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
new file mode 100644
index 0000000000..2199a9ce7a
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.List;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Intended only for local tests to verify functionality.")
+public class TestGetKafka {
+
+ public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
+
+ @BeforeClass
+ public static void configureLogging() {
+ System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka", "INFO");
+ BasicConfigurator.configure();
+ }
+
+ @Test
+ public void testIntegrationLocally() {
+ final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
+ runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
+ runner.setProperty(GetKafka.TOPIC, "testX");
+ runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+ runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+
+ runner.run(20, false);
+
+ final List flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
+ for ( final MockFlowFile flowFile : flowFiles ) {
+ System.out.println(flowFile.getAttributes());
+ System.out.println(new String(flowFile.toByteArray()));
+ System.out.println();
+ }
+ }
+
+}
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
new file mode 100644
index 0000000000..2e6aacfd7d
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -0,0 +1,48 @@
+package org.apache.nifi.processors.kafka;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore("Intended only for local testing to verify functionality.")
+public class TestPutKafka {
+
+ @Test
+ public void testKeyValuePut() {
+ final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
+ runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
+ runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
+ runner.setProperty(PutKafka.KEY, "${kafka.key}");
+ runner.setProperty(PutKafka.TIMEOUT, "3 secs");
+ runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
+
+ final Map attributes = new HashMap<>();
+ attributes.put("kafka.topic", "test");
+ attributes.put("kafka.key", "key3");
+
+ final byte[] data = "Hello, World, Again! ;)".getBytes();
+ runner.enqueue(data, attributes);
+ runner.enqueue(data, attributes);
+ runner.enqueue(data, attributes);
+ runner.enqueue(data, attributes);
+
+ runner.run(5);
+
+ runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
+ final List mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
+ final MockFlowFile mff = mffs.get(0);
+
+ assertTrue(Arrays.equals(data, mff.toByteArray()));
+ }
+
+}
diff --git a/nar-bundles/kafka-bundle/pom.xml b/nar-bundles/kafka-bundle/pom.xml
new file mode 100644
index 0000000000..146db1295f
--- /dev/null
+++ b/nar-bundles/kafka-bundle/pom.xml
@@ -0,0 +1,35 @@
+
+
+
+
+ org.apache.nifi
+ nar-bundle-parent
+ 0.0.1-SNAPSHOT
+
+
+ 4.0.0
+
+ kafka-bundle
+ pom
+
+ kafka-bundle
+
+
+ kafka-processors
+ kafka-nar
+
+
+
diff --git a/pom.xml b/pom.xml
index 1de1a0e28e..68e718a0fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -766,6 +766,12 @@
${project.version}nar
+
+ org.apache.nifi
+ kafka-nar
+ ${project.version}
+ nar
+ org.apache.nifinifi-properties