From 3e2f79067251adfd2846a3aea42c052e7cae8e92 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Thu, 1 Jan 2015 20:52:36 -0500 Subject: [PATCH] NIFI-220: Initial Import of GetKafka and PutKafka processors --- assembly/pom.xml | 5 + nar-bundles/kafka-bundle/kafka-nar/pom.xml | 37 +++ .../kafka-bundle/kafka-processors/pom.xml | 76 +++++ .../nifi/processors/kafka/GetKafka.java | 259 ++++++++++++++++++ .../nifi/processors/kafka/PutKafka.java | 252 +++++++++++++++++ .../org.apache.nifi.processor.Processor | 2 + .../index.html | 147 ++++++++++ .../index.html | 177 ++++++++++++ .../nifi/processors/kafka/TestGetKafka.java | 59 ++++ .../nifi/processors/kafka/TestPutKafka.java | 48 ++++ nar-bundles/kafka-bundle/pom.xml | 35 +++ pom.xml | 6 + 12 files changed, 1103 insertions(+) create mode 100644 nar-bundles/kafka-bundle/kafka-nar/pom.xml create mode 100644 nar-bundles/kafka-bundle/kafka-processors/pom.xml create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java create mode 100644 nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java create mode 100644 nar-bundles/kafka-bundle/pom.xml diff --git a/assembly/pom.xml b/assembly/pom.xml index ae74485deb..0d00da3ffb 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -203,6 +203,11 @@ hadoop-nar nar + + org.apache.nifi + kafka-nar + nar + diff --git a/nar-bundles/kafka-bundle/kafka-nar/pom.xml b/nar-bundles/kafka-bundle/kafka-nar/pom.xml new file mode 100644 index 0000000000..8dfefddedd --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-nar/pom.xml @@ -0,0 +1,37 @@ + + + 4.0.0 + + + org.apache.nifi + kafka-bundle + 0.0.1-SNAPSHOT + + + kafka-nar + 0.0.1-SNAPSHOT + NiFi Kafka NAR + nar + NiFi NAR for interacting with Apache Kafka + + + + org.apache.nifi + kafka-processors + ${project.version} + + + diff --git a/nar-bundles/kafka-bundle/kafka-processors/pom.xml b/nar-bundles/kafka-bundle/kafka-processors/pom.xml new file mode 100644 index 0000000000..8cad323545 --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/pom.xml @@ -0,0 +1,76 @@ + + + + + org.apache.nifi + kafka-bundle + 0.0.1-SNAPSHOT + + + 4.0.0 + + kafka-processors + jar + + kafka-processors + + + + org.apache.nifi + nifi-api + + + org.apache.nifi + nifi-processor-utils + + + org.apache.nifi + nifi-utils + + + org.apache.kafka + kafka_2.8.2 + 0.8.1 + + + + javax.jms + jms + + + com.sun.jdmk + jmxtools + + + com.sun.jmx + jmxri + + + + + + org.apache.nifi + nifi-mock + test + + + org.slf4j + slf4j-simple + test + + + diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java new file mode 100644 index 0000000000..55c67e36d6 --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -0,0 +1,259 @@ +package org.apache.nifi.processors.kafka; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.MessageAndMetadata; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.annotation.CapabilityDescription; +import org.apache.nifi.processor.annotation.OnScheduled; +import org.apache.nifi.processor.annotation.OnStopped; +import org.apache.nifi.processor.annotation.OnUnscheduled; +import org.apache.nifi.processor.annotation.SupportsBatching; +import org.apache.nifi.processor.annotation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +@SupportsBatching +@CapabilityDescription("Fetches messages from Apache Kafka") +@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) +public class GetKafka extends AbstractProcessor { + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() + .name("ZooKeeper Connection String") + .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of : combinations. For example, host1:2181,host2:2181,host3:2188") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic to pull messages from") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() + .name("Zookeeper Commit Frequency") + .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("60 secs") + .build(); + public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() + .name("ZooKeeper Communications Timeout") + .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() + .name("Kafka Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles that are created are routed to this relationship") + .build(); + + + private final BlockingQueue> streamIterators = new LinkedBlockingQueue<>(); + private volatile ConsumerConnector consumer; + + final Lock interruptionLock = new ReentrantLock(); + // guarded by interruptionLock + private final Set interruptableThreads = new HashSet<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + + final List props = new ArrayList<>(); + props.add(ZOOKEEPER_CONNECTION_STRING); + props.add(TOPIC); + props.add(ZOOKEEPER_COMMIT_DELAY); + props.add(clientNameWithDefault); + props.add(KAFKA_TIMEOUT); + props.add(ZOOKEEPER_TIMEOUT); + return props; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(1); + relationships.add(REL_SUCCESS); + return relationships; + } + + @OnScheduled + public void createConsumers(final ProcessContext context) { + final String topic = context.getProperty(TOPIC).getValue(); + + final Map topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, context.getMaxConcurrentTasks()); + + final Properties props = new Properties(); + props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); + props.setProperty("group.id", getIdentifier()); + props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); + props.setProperty("auto.commit.enable", "true"); // just be explicit + props.setProperty("auto.offset.reset", "smallest"); + + final ConsumerConfig consumerConfig = new ConsumerConfig(props); + consumer = Consumer.createJavaConsumerConnector(consumerConfig); + + final Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + final List> streams = consumerMap.get(topic); + + this.streamIterators.clear(); + + for ( final KafkaStream stream : streams ) { + streamIterators.add(stream.iterator()); + } + } + + @OnStopped + public void shutdownConsumer() { + if ( consumer != null ) { + try { + consumer.commitOffsets(); + } finally { + consumer.shutdown(); + } + } + } + + @OnUnscheduled + public void interruptIterators() { + // Kafka doesn't provide a non-blocking API for pulling messages. We can, however, + // interrupt the Threads. We do this when the Processor is stopped so that we have the + // ability to shutdown the Processor. + interruptionLock.lock(); + try { + for ( final Thread t : interruptableThreads ) { + t.interrupt(); + } + + interruptableThreads.clear(); + } finally { + interruptionLock.unlock(); + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + ConsumerIterator iterator = streamIterators.poll(); + if ( iterator == null ) { + return; + } + + FlowFile flowFile = null; + try { + interruptionLock.lock(); + try { + interruptableThreads.add(Thread.currentThread()); + } finally { + interruptionLock.unlock(); + } + + try { + if (!iterator.hasNext() ) { + return; + } + } catch (final Exception e) { + getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e}); + iterator = null; + return; + } + + final long start = System.nanoTime(); + final MessageAndMetadata mam = iterator.next(); + + if ( mam == null ) { + return; + } + + final byte[] key = mam.key(); + + final Map attributes = new HashMap<>(); + if ( key != null ) { + attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); + } + attributes.put("kafka.offset", String.valueOf(mam.offset())); + attributes.put("kafka.partition", String.valueOf(mam.partition())); + attributes.put("kafka.topic", mam.topic()); + + flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(mam.message()); + } + }); + + flowFile = session.putAllAttributes(flowFile, attributes); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis); + getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis}); + session.transfer(flowFile, REL_SUCCESS); + } catch (final Exception e) { + getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e}); + if ( flowFile != null ) { + session.remove(flowFile); + } + } finally { + interruptionLock.lock(); + try { + interruptableThreads.remove(Thread.currentThread()); + } finally { + interruptionLock.unlock(); + } + + if ( iterator != null ) { + streamIterators.offer(iterator); + } + } + } + +} diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java new file mode 100644 index 0000000000..5e5940cdfa --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.annotation.CapabilityDescription; +import org.apache.nifi.processor.annotation.OnStopped; +import org.apache.nifi.processor.annotation.SupportsBatching; +import org.apache.nifi.processor.annotation.Tags; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + +@SupportsBatching +@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") +public class PutKafka extends AbstractProcessor { + private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; + + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than but can result in data loss if a Kafka node crashes"); + public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss."); + + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format :") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder() + .name("Max FlowFile Size") + .description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1 MB") + .build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + public static final Relationship REL_REJECT = new Relationship.Builder() + .name("reject") + .description("Any FlowFile whose size exceeds the property will be routed to this Relationship") + .build(); + + private final BlockingQueue> producers = new LinkedBlockingQueue<>(); + + @Override + protected List getSupportedPropertyDescriptors() { + final PropertyDescriptor clientName = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + + final List props = new ArrayList<>(); + props.add(SEED_BROKERS); + props.add(TOPIC); + props.add(KEY); + props.add(DELIVERY_GUARANTEE); + props.add(TIMEOUT); + props.add(MAX_FLOWFILE_SIZE); + props.add(clientName); + return props; + } + + @Override + public Set getRelationships() { + final Set relationships = new HashSet<>(1); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + relationships.add(REL_REJECT); + return relationships; + } + + + @OnStopped + public void closeProducers() { + Producer producer; + + while ((producer = producers.poll()) != null) { + producer.close(); + } + } + + + private Producer createProducer(final ProcessContext context) { + final String brokers = context.getProperty(SEED_BROKERS).getValue(); + + final Properties properties = new Properties(); + properties.setProperty("metadata.broker.list", brokers); + properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); + properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); + properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); + + properties.setProperty("message.send.max.retries", "1"); + properties.setProperty("producer.type", "sync"); + + final ProducerConfig config = new ProducerConfig(properties); + return new Producer<>(config); + } + + private Producer borrowProducer(final ProcessContext context) { + Producer producer = producers.poll(); + return producer == null ? createProducer(context) : producer; + } + + private void returnProducer(final Producer producer) { + producers.offer(producer); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + final long start = System.nanoTime(); + final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue(); + if ( flowFile.getSize() > maxSize ) { + getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile}); + session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize); + session.transfer(flowFile, REL_REJECT); + return; + } + + final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final byte[] value = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, value); + } + }); + + final Producer producer = borrowProducer(context); + boolean error = false; + try { + final KeyedMessage message; + if ( key == null ) { + message = new KeyedMessage<>(topic, value); + } else { + message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value); + } + + producer.send(message); + final long nanos = System.nanoTime() - start; + + session.getProvenanceReporter().send(flowFile, "kafka://" + topic); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + } catch (final Exception e) { + getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + error = true; + } finally { + if ( error ) { + producer.close(); + } else { + returnProducer(producer); + } + } + } + +} diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor new file mode 100644 index 0000000000..6ae3da1885 --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -0,0 +1,2 @@ +org.apache.nifi.processors.kafka.GetKafka +org.apache.nifi.processors.kafka.PutKafka \ No newline at end of file diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html new file mode 100644 index 0000000000..d429d6b5eb --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html @@ -0,0 +1,147 @@ + + + + + + GetKafka + + + + + +

Description:

+

+ This Processors polls Apache Kafka + for data. When a message is received from Kafka, this Processor emits a FlowFile + where the content of the FlowFile is the value of the Kafka message. If the + message has a key associated with it, an attribute named kafka.key + will be added to the FlowFile, with the value being the UTF-8 Encoded value + of the Message's Key. +

+

+ Kafka supports the notion of a Consumer Group when pulling messages in order to + provide scalability while still offering a publish-subscribe interface. Each + Consumer Group must have a unique identifier. The Consumer Group identifier that + is used by NiFi is the UUID of the Processor. This means that all of the nodes + within a cluster will use the same Consumer Group Identifier so that they do + not receive duplicate data but multiple GetKafka Processors can be used to pull + from multiple Topics, as each Processor will receive a different Processor UUID + and therefore a different Consumer Group Identifier. +

+ +

+ Modifies Attributes: +

+ + + + + + + + + + + + + + + + + + + + + + + + + +
Attribute NameDescription
kafka.keyThe key of the Kafka message, if it exists. If the message does not have a key, + this attribute will not be added.
kafka.topicThe name of the Kafka Topic from which the message was received
kafka.partitionThe partition of the Kafka Topic from which the message was received
kafka.offsetThe offset of the message within the Kafka partition
+ + +

+ Properties: +

+

In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.

+
    +
  • ZooKeeper Connection String +
      +
    • The Connection String to use in order to connect to ZooKeeper. This is often a + comma-separated list of <host>:<port> combinations. For example, + host1:2181,host2:2181,host3:2188
    • +
    • Default value: no default
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Topic Name +
      +
    • The Kafka Topic to pull messages from
    • +
    • Default value: no default
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Zookeeper Commit Frequency +
      +
    • Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. + A longer time period will result in better overall performance but can result in more data + duplication if a NiFi node is lost +
    • +
    • Default value: 60 secs
    • +
    • Supports expression language: false
    • +
    +
  • +
  • ZooKeeper Communications Timeout +
      +
    • The amount of time to wait for a response from ZooKeeper before determining that there is a communications error
    • +
    • Default value: 30 secs
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Kafka Communications Timeout +
      +
    • The amount of time to wait for a response from Kafka before determining that there is a communications error
    • +
    • Default value: 30 secs
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Client Name +
      +
    • Client Name to use when communicating with Kafka
    • +
    • Default value: "NiFi-" followed by the UUID of the Processor
    • +
    • Supports expression language: false
    • +
    +
  • + +
+

+ Relationships: +

+
    +
  • success +
      +
    • All messages that are received from Kafka are routed to the 'success' relationship
    • +
    +
  • +
+ + + diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html new file mode 100644 index 0000000000..38256c5fe9 --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html @@ -0,0 +1,177 @@ + + + + + + PutKafka + + + + + +

Description:

+

+ This Processors puts the contents of a FlowFile to a Topic in + Apache Kafka. The full contents of + a FlowFile becomes the contents of a single message in Kafka. + This message is optionally assigned a key by using the + <Kafka Key> Property. +

+ + +

+ Properties: +

+

In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.

+
    +
  • Known Brokers +
      +
    • + A comma-separated list of known Kafka Brokers in the format + &lgt;host>:<port>. This list does not need to be + exhaustive but provides a mechanism for determining which + other nodes belong to the Kafka cluster. +
    • +
    • Default value: no default
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Topic Name +
      +
    • The Kafka Topic to send messages to. While the GetKafka + Processor requires a statically named Topic so that it knows + where to fetch messages from, the PutKafka Processor does allow + the Expression Language to be used so that a single PutKafka + Processor can be used to send messages to many different Kafka + topics. +
    • +
    • Default value: no default
    • +
    • Supports expression language: true
    • +
    +
  • + +
  • Kafka Key +
      +
    • + The Key to use for the Message. If no value is given, the message + will not be given a Key. +
    • +
    • Default value: no default
    • +
    • Supports expression language: true
    • +
    +
  • +
  • Delivery Guarantee +
      +
    • + Specifies the requirement for guaranteeing that a message is sent to Kafka. + This Property can have one of three different values: +
        +
      • + Guarantee Replicated Delivery - FlowFile will be routed to + failure unless the message is replicated to the appropriate number + of Kafka Nodes according to the Topic configuration +
      • +
      • + Guarantee Single Node Delivery - FlowFile will be routed to + success if the message is received by a single Kafka node, + whether or not it is replicated. This is faster than + <Guarantee Replicated Delivery> but can result in data loss + if a Kafka node crashes +
      • +
      • + Best Effort - FlowFile will be routed to success after successfully + writing the content to a Kafka node, without waiting for a response. + This provides the best performance but may result in data loss. +
      • +
      +
    • +
    • Default value: Best Effort
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Communications Timeout +
      +
    • + The amount of time to wait for a response from Kafka before determining + that there is a communications error +
    • +
    • Default value: 30 secs
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Max FlowFile Size +
      +
    • + Specifies the amount of data that can be buffered to send to Kafka. Because + the contents of the FlowFile must be buffered into memory before they can + be sent to Kafka, attempting to send a very large FlowFile can cause + problems by causing the machine to run out of memory. + This helps to prevent the system from running out of memory, the PutKafka + Processor exposes a property for specifying the maximum size of a FlowFile. + If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. +
    • +
    • Default value: 1 MB
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Client Name +
      +
    • Client Name to use when communicating with Kafka
    • +
    • Default value: "NiFi-" followed by the UUID of the Processor
    • +
    • Supports expression language: false
    • +
    +
  • +
+ + +

+ Relationships: +

+
    +
  • success +
      +
    • All FlowFiles that are successfully sent to Kafka are routed + to this relationship. +
    • +
    +
  • + +
  • reject +
      +
    • Any FlowFile whose content size exceeds the configured value for + the <Max FlowFile Size> property will be routed to this + relationship. +
    • +
    +
  • + +
  • failure +
      +
    • All FlowFiles that cannot be sent to Kafka for any reason other + than their content size exceeding the value of the <Max FlowFile + Size> property will be routed to this relationship. +
    • +
    +
  • + +
+ + + diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java new file mode 100644 index 0000000000..2199a9ce7a --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.util.List; + +import org.apache.log4j.BasicConfigurator; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("Intended only for local tests to verify functionality.") +public class TestGetKafka { + + public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181"; + + @BeforeClass + public static void configureLogging() { + System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka", "INFO"); + BasicConfigurator.configure(); + } + + @Test + public void testIntegrationLocally() { + final TestRunner runner = TestRunners.newTestRunner(GetKafka.class); + runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION); + runner.setProperty(GetKafka.TOPIC, "testX"); + runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); + runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); + + runner.run(20, false); + + final List flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS); + for ( final MockFlowFile flowFile : flowFiles ) { + System.out.println(flowFile.getAttributes()); + System.out.println(new String(flowFile.toByteArray())); + System.out.println(); + } + } + +} diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java new file mode 100644 index 0000000000..2e6aacfd7d --- /dev/null +++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -0,0 +1,48 @@ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; + + +@Ignore("Intended only for local testing to verify functionality.") +public class TestPutKafka { + + @Test + public void testKeyValuePut() { + final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); + runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); + runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); + runner.setProperty(PutKafka.KEY, "${kafka.key}"); + runner.setProperty(PutKafka.TIMEOUT, "3 secs"); + runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + + final Map attributes = new HashMap<>(); + attributes.put("kafka.topic", "test"); + attributes.put("kafka.key", "key3"); + + final byte[] data = "Hello, World, Again! ;)".getBytes(); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + + runner.run(5); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); + final List mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); + final MockFlowFile mff = mffs.get(0); + + assertTrue(Arrays.equals(data, mff.toByteArray())); + } + +} diff --git a/nar-bundles/kafka-bundle/pom.xml b/nar-bundles/kafka-bundle/pom.xml new file mode 100644 index 0000000000..146db1295f --- /dev/null +++ b/nar-bundles/kafka-bundle/pom.xml @@ -0,0 +1,35 @@ + + + + + org.apache.nifi + nar-bundle-parent + 0.0.1-SNAPSHOT + + + 4.0.0 + + kafka-bundle + pom + + kafka-bundle + + + kafka-processors + kafka-nar + + + diff --git a/pom.xml b/pom.xml index 1de1a0e28e..68e718a0fd 100644 --- a/pom.xml +++ b/pom.xml @@ -766,6 +766,12 @@ ${project.version} nar + + org.apache.nifi + kafka-nar + ${project.version} + nar + org.apache.nifi nifi-properties