Merge branch 'NIFI-1192B' of https://github.com/olegz/nifi into NIFI-1192

This commit is contained in:
Mark Payne 2015-11-25 12:30:15 -05:00
commit 657885e5ba
4 changed files with 216 additions and 87 deletions

View File

@ -65,6 +65,11 @@
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>

View File

@ -24,14 +24,15 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -39,9 +40,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
@ -56,6 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
@ -70,6 +70,11 @@ import kafka.message.MessageAndMetadata;
+ " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
@WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
@WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")})
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
+ " overriden with warning message describing the override."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
public class GetKafka extends AbstractProcessor {
public static final String SMALLEST = "smallest";
@ -167,9 +172,7 @@ public class GetKafka extends AbstractProcessor {
private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
private volatile ConsumerConnector consumer;
final Lock interruptionLock = new ReentrantLock();
// guarded by interruptionLock
private final Set<Thread> interruptableThreads = new HashSet<>();
private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -203,26 +206,69 @@ public class GetKafka extends AbstractProcessor {
return relationships;
}
@OnScheduled
public void createConsumers(final ProcessContext context) {
final String topic = context.getProperty(TOPIC).getValue();
final Map<String, Integer> topicCountMap = new HashMap<>(1);
topicCountMap.put(topic, context.getMaxConcurrentTasks());
final Properties props = new Properties();
props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
props.setProperty("group.id", context.getProperty(GROUP_ID).getValue());
props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
props.setProperty("auto.commit.enable", "true"); // just be explicit
props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue());
props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
props.setProperty("zookeeper.connection.timeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
if (props.containsKey(descriptor.getName())) {
this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
+ props.getProperty(descriptor.getName()) + "' with dynamically set value '"
+ entry.getValue() + "'.");
}
props.setProperty(descriptor.getName(), entry.getValue());
}
}
/*
* Unless user sets it to some explicit value we are setting it to the
* lowest possible value of 1 millisecond to ensure the
* consumerStream.hasNext() doesn't block. See
* http://kafka.apache.org/documentation.html#configuration) as well as
* comment in 'catch ConsumerTimeoutException' in onTrigger() for more
* explanation as to the reasoning behind it.
*/
if (!props.containsKey("consumer.timeout.ms")) {
this.getLogger().info("Setting 'consumer.timeout.ms' to 1 milliseconds to avoid consumer"
+ " block in the event when no events are present in Kafka topic. If you wish to change this value "
+ " set it as dynamic property. If you wish to explicitly enable consumer block (at your own risk)"
+ " set its value to -1.");
props.setProperty("consumer.timeout.ms", "1");
}
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
final Map<String, Integer> topicCountMap = new HashMap<>(1);
int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
int concurrentTaskToUse = context.getMaxConcurrentTasks();
if (context.getMaxConcurrentTasks() < partitionCount){
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
+ "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
+ "Consider making it equal to the amount of partition count for most efficient event consumption.");
} else if (context.getMaxConcurrentTasks() > partitionCount){
concurrentTaskToUse = partitionCount;
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
+ "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
+ "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to "
+ "consume events");
}
topicCountMap.put(topic, concurrentTaskToUse);
final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
@ -231,10 +277,12 @@ public class GetKafka extends AbstractProcessor {
for (final KafkaStream<byte[], byte[]> stream : streams) {
streamIterators.add(stream.iterator());
}
this.consumerStreamsReady.set(true);
}
@OnStopped
public void shutdownConsumer() {
this.consumerStreamsReady.set(false);
if (consumer != null) {
try {
consumer.commitOffsets();
@ -244,75 +292,57 @@ public class GetKafka extends AbstractProcessor {
}
}
@OnUnscheduled
public void interruptIterators() {
// Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
// interrupt the Threads. We do this when the Processor is stopped so that we have the
// ability to shutdown the Processor.
interruptionLock.lock();
try {
for (final Thread t : interruptableThreads) {
t.interrupt();
}
interruptableThreads.clear();
} finally {
interruptionLock.unlock();
}
}
protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
return streamIterators.poll();
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
if (iterator == null) {
return;
/*
* Will ensure that consumer streams are ready upon the first invocation
* of onTrigger. Will be reset to 'false' in the event of exception
*/
synchronized (this.consumerStreamsReady) {
if (!this.consumerStreamsReady.get()) {
this.createConsumers(context);
}
}
ConsumerIterator<byte[], byte[]> iterator = this.getStreamIterator();
if (iterator != null) {
this.consumeFromKafka(context, session, iterator);
}
}
protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
return this.streamIterators.poll();
}
private void consumeFromKafka(final ProcessContext context, final ProcessSession session,
ConsumerIterator<byte[], byte[]> iterator) throws ProcessException {
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
final String topic = context.getProperty(TOPIC).getValue();
FlowFile flowFile = null;
FlowFile flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", topic);
final long start = System.nanoTime();
int msgCount = 0;
try {
// add the current thread to the Set of those to be interrupted if processor stopped.
interruptionLock.lock();
try {
interruptableThreads.add(Thread.currentThread());
} finally {
interruptionLock.unlock();
}
final long start = System.nanoTime();
flowFile = session.create();
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", topic);
int numMessages = 0;
for (int msgCount = 0; msgCount < batchSize; msgCount++) {
// if the processor is stopped, iterator.hasNext() will throw an Exception.
// In this case, we just break out of the loop.
try {
if (!iterator.hasNext()) {
break;
}
} catch (final Exception e) {
break;
}
for (; msgCount < batchSize && iterator.hasNext(); msgCount++) {
final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
if (mam == null) {
return;
}
final byte[] key = mam.key();
if (batchSize == 1) {
final byte[] key = mam.key();
// the kafka.key, kafka.offset, and kafka.partition attributes are added only
// for a batch size of 1.
if (key != null) {
@ -334,33 +364,26 @@ public class GetKafka extends AbstractProcessor {
out.write(mam.message());
}
});
numMessages++;
}
// If we received no messages, remove the FlowFile. Otherwise, send to success.
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis);
getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, numMessages, millis});
session.transfer(flowFile, REL_SUCCESS);
}
this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount);
} catch (ConsumerTimeoutException e) {
/*
* By default Kafka blocks indefinitely if topic is empty via
* stream.hasNext(). If 'consumer.timeout.ms' property is set (see
* http://kafka.apache.org/documentation.html#configuration) the
* hasNext() will fail with this exception. To this processor it
* simply means there are no messages and current task should exit
* in non-failure releasing the flow file if it was able to
* accumulate any events.
*/
this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount);
} catch (final Exception e) {
this.shutdownConsumer();
getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e});
if (flowFile != null) {
session.remove(flowFile);
}
} finally {
// Remove the current thread from the Set of Threads to interrupt.
interruptionLock.lock();
try {
interruptableThreads.remove(Thread.currentThread());
} finally {
interruptionLock.unlock();
}
// Add the iterator back to the queue
if (iterator != null) {
streamIterators.offer(iterator);
@ -368,4 +391,22 @@ public class GetKafka extends AbstractProcessor {
}
}
/**
* Will release flow file. Releasing of the flow file in the context of this
* operation implies the following:
*
* If Empty then remove from session and return
* If has something then transfer to REL_SUCCESS
*/
private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<String, String> attributes, long start, String topic, int msgCount){
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + msgCount + " Kafka messages", millis);
getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, msgCount, millis});
session.transfer(flowFile, REL_SUCCESS);
}
}
}

View File

@ -0,0 +1,56 @@
package org.apache.nifi.processors.kafka;
import java.util.Collections;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.I0Itec.zkclient.serialize.ZkSerializer;
import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata;
import kafka.utils.ZKStringSerializer;
import scala.collection.JavaConversions;
/**
* Utility class to support interruction with Kafka internals.
*
*/
class KafkaUtils {
/**
* Will retrieve the amount of partitions for a given Kafka topic.
*/
static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) {
ZkClient zkClient = new ZkClient(zookeeperConnectionString);
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return ZKStringSerializer.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
}
});
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
return topicMetadatas.size();
}
}

View File

@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -44,6 +45,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@ -81,6 +83,11 @@ import scala.actors.threadpool.Arrays;
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
+ "user-specified delimiter, such as a new-line.")
@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
+ " overriden with warning message describing the override."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
public class PutKafka extends AbstractSessionFactoryProcessor {
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
@ -356,6 +363,18 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
properties.setProperty("retries", "0");
properties.setProperty("block.on.buffer.full", "false");
for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
if (properties.containsKey(descriptor.getName())) {
this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
+ properties.getProperty(descriptor.getName()) + "' with dynamically set value '"
+ entry.getValue() + "'.");
}
properties.setProperty(descriptor.getName(), entry.getValue());
}
}
return properties;
}
@ -397,6 +416,14 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
return partitionInfos.get(partitionIdx).partition();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
.build();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
FlowFileMessageBatch batch;