NIFI-1097: Rewrite PutKafka to use the new producer api

This commit is contained in:
Mark Payne 2015-11-03 16:17:32 -05:00
parent 7a165b62cc
commit 22de23baa6
3 changed files with 734 additions and 436 deletions

View File

@ -34,6 +34,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.1</artifactId>

View File

@ -21,30 +21,47 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
@ -56,21 +73,20 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import scala.actors.threadpool.Arrays;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor {
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
+ "user-specified delimiter, such as a new-line.")
@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
public class PutKafka extends AbstractSessionFactoryProcessor {
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to"
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
+ " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
@ -79,16 +95,6 @@ public class PutKafka extends AbstractProcessor {
+ " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+ " in data loss.");
/**
* AllowableValue for a Producer Type that synchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately.");
/**
* AllowableValue for a Producer Type that asynchronously sends messages to Kafka
*/
public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka."
+ " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data.");
/**
* AllowableValue for sending messages to Kafka without compression
@ -105,6 +111,13 @@ public class PutKafka extends AbstractProcessor {
*/
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy");
static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin",
"Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary.");
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random",
"Messages will be assigned to random partitions.");
static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined",
"The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition.");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers")
@ -120,6 +133,21 @@ public class PutKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder()
.name("Partition Strategy")
.description("Specifies how messages should be partitioned when sent to Kafka")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
.defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
.required(true)
.build();
public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("Partition")
.description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. "
+ "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("Kafka Key")
.description("The Key to use for the Message")
@ -140,7 +168,10 @@ public class PutKafka extends AbstractProcessor {
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate Kafka message.")
+ "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile "
+ "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those "
+ "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' "
+ "relationship.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@ -151,6 +182,13 @@ public class PutKafka extends AbstractProcessor {
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("5 MB")
.build();
static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder()
.name("Max Record Size")
.description("The maximum size that any individual record can be.")
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.required(true)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
@ -168,20 +206,10 @@ public class PutKafka extends AbstractProcessor {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder()
.name("Producer Type")
.description("This parameter specifies whether the messages are sent asynchronously in a background thread.")
.required(true)
.allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue())
.build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Batch Size")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode."
+ " The producer will wait until either this number of messages are ready"
.displayName("Batch Size")
.description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready"
+ " to send or \"Queue Buffering Max Time\" is reached.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
@ -189,35 +217,13 @@ public class PutKafka extends AbstractProcessor {
.build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder()
.name("Queue Buffering Max Time")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms"
+ " will try to batch together 100ms of messages to send at once. This will improve"
.description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms"
+ " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve"
+ " throughput but adds message delivery latency due to the buffering.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("5 secs")
.build();
public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder()
.name("Queue Buffer Max Count")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The maximum number of unsent messages that can be queued up in the producer when"
+ " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("10000")
.build();
public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder()
.name("Queue Enqueue Timeout")
.description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"."
+ " The amount of time to block before dropping messages when running in "
+ PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode"
+ " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will"
+ " be enqueued immediately or dropped if the queue is full (the producer send call will"
+ " never block). If not set, the producer will block indefinitely and never willingly"
+ " drop a send.")
.required(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("Compression Codec")
.description("This parameter allows you to specify the compression codec for all"
@ -227,16 +233,6 @@ public class PutKafka extends AbstractProcessor {
.allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY)
.defaultValue(COMPRESSION_CODEC_NONE.getValue())
.build();
public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder()
.name("Compressed Topics")
.description("This parameter allows you to set whether compression should be turned on"
+ " for particular topics. If the compression codec is anything other than"
+ " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any."
+ " If the list of compressed topics is empty, then enable the specified"
+ " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + ","
+ " compression is disabled for all topics")
.required(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -247,7 +243,13 @@ public class PutKafka extends AbstractProcessor {
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build();
private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+");
private final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>();
private final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
private final ConcurrentMap<String, AtomicLong> partitionIndexMap = new ConcurrentHashMap<>();
private volatile Producer<byte[], byte[]> producer;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@ -259,36 +261,21 @@ public class PutKafka extends AbstractProcessor {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(SEED_BROKERS);
props.add(TOPIC);
props.add(PARTITION_STRATEGY);
props.add(PARTITION);
props.add(KEY);
props.add(DELIVERY_GUARANTEE);
props.add(MESSAGE_DELIMITER);
props.add(MAX_BUFFER_SIZE);
props.add(MAX_RECORD_SIZE);
props.add(TIMEOUT);
props.add(PRODUCER_TYPE);
props.add(BATCH_NUM_MESSAGES);
props.add(QUEUE_BUFFERING_MAX_MESSAGES);
props.add(QUEUE_BUFFERING_MAX);
props.add(QUEUE_ENQUEUE_TIMEOUT);
props.add(COMPRESSION_CODEC);
props.add(COMPRESSED_TOPICS);
props.add(clientName);
return props;
}
@Override
public Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> errors = new ArrayList<>(super.customValidate(context));
final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger();
final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger();
if (batchMessages > bufferMaxMessages) {
errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false)
.explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build());
}
return errors;
}
@Override
public Set<Relationship> getRelationships() {
@ -298,71 +285,131 @@ public class PutKafka extends AbstractProcessor {
return relationships;
}
@OnStopped
public void closeProducers() {
Producer<byte[], byte[]> producer;
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> results = new ArrayList<>();
while ((producer = producers.poll()) != null) {
final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue();
if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) {
results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation(
"The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy").build());
}
return results;
}
protected Producer<byte[], byte[]> getProducer() {
return producer;
}
@OnStopped
public void cleanup() {
final Producer<byte[], byte[]> producer = getProducer();
if (producer != null) {
producer.close();
}
for (final FlowFileMessageBatch batch : activeBatches) {
batch.cancelOrComplete();
}
}
protected ProducerConfig createConfig(final ProcessContext context) {
@OnScheduled
public void createProducer(final ProcessContext context) {
producer = new KafkaProducer<byte[], byte[]>(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer());
}
protected int getActiveMessageBatchCount() {
return activeBatches.size();
}
protected int getCompleteMessageBatchCount() {
return completeBatches.size();
}
protected Properties createConfig(final ProcessContext context) {
final String brokers = context.getProperty(SEED_BROKERS).getValue();
final Properties properties = new Properties();
properties.setProperty("metadata.broker.list", brokers);
properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue());
properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue());
final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
properties.setProperty("timeout.ms", timeout);
properties.setProperty("metadata.fetch.timeout.ms", timeout);
properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue());
properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue()));
final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
properties.setProperty("buffer.memory", String.valueOf(maxBufferSize));
final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.type", compressionCodec);
final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS);
if (queueBufferingMillis != null) {
properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis));
}
properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue());
final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
if (queueEnqueueTimeoutMillis != null) {
properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis));
properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis));
}
final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue();
properties.setProperty("compression.codec", compressionCodec);
properties.setProperty("retries", "0");
properties.setProperty("block.on.buffer.full", "false");
final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue();
if (compressedTopics != null) {
properties.setProperty("compressed.topics", compressedTopics);
return properties;
}
return new ProducerConfig(properties);
private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) {
final long unnormalizedIndex;
final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
AtomicLong partitionIndex = partitionIndexMap.get(topic);
if (partitionIndex == null) {
partitionIndex = new AtomicLong(0L);
final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex);
if (existing != null) {
partitionIndex = existing;
}
}
protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
return new Producer<>(createConfig(context));
unnormalizedIndex = partitionIndex.getAndIncrement();
} else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
return null;
} else {
if (context.getProperty(PARTITION).isSet()) {
final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
if (NUMBER_PATTERN.matcher(partitionValue).matches()) {
// Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions.
unnormalizedIndex = Long.parseLong(partitionValue) - 1;
} else {
unnormalizedIndex = partitionValue.hashCode();
}
} else {
return null;
}
}
private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
final Producer<byte[], byte[]> producer = producers.poll();
return producer == null ? createProducer(context) : producer;
}
private void returnProducer(final Producer<byte[], byte[]> producer) {
producers.offer(producer);
final Producer<byte[], byte[]> producer = getProducer();
final List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size());
return partitionInfos.get(partitionIdx).partition();
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
FlowFileMessageBatch batch;
while ((batch = completeBatches.poll()) != null) {
batch.completeSession();
}
final ProcessSession session = sessionFactory.createSession();
final FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
final long start = System.nanoTime();
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
@ -371,8 +418,7 @@ public class PutKafka extends AbstractProcessor {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
final Producer<byte[], byte[]> producer = borrowProducer(context);
final Producer<byte[], byte[]> producer = getProducer();
if (delimiter == null) {
// Send the entire FlowFile as a single message.
@ -384,31 +430,38 @@ public class PutKafka extends AbstractProcessor {
}
});
boolean error = false;
final Integer partition;
try {
final KeyedMessage<byte[], byte[]> message;
if (key == null) {
message = new KeyedMessage<>(topic, value);
} else {
message = new KeyedMessage<>(topic, keyBytes, value);
}
producer.send(message);
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
partition = getPartition(context, flowFile, topic);
} catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e });
getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e});
session.transfer(session.penalize(flowFile), REL_FAILURE);
error = true;
} finally {
if (error) {
producer.close();
} else {
returnProducer(producer);
session.commit();
return;
}
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value);
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic);
messageBatch.setNumMessages(1);
activeBatches.add(messageBatch);
try {
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception == null) {
// record was successfully sent.
messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset());
} else {
messageBatch.addFailedRange(0L, flowFile.getSize(), exception);
}
}
});
} catch (final BufferExhaustedException bee) {
messageBatch.addFailedRange(0L, flowFile.getSize(), bee);
context.yield();
return;
}
} else {
final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
@ -418,9 +471,9 @@ public class PutKafka extends AbstractProcessor {
// the stream of bytes in the FlowFile
final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
boolean error = false;
final LongHolder lastMessageOffset = new LongHolder(0L);
final LongHolder messagesSent = new LongHolder(0L);
final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic);
activeBatches.add(messageBatch);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
session.read(flowFile, new InputStreamCallback() {
@ -430,13 +483,12 @@ public class PutKafka extends AbstractProcessor {
boolean streamFinished = false;
final List<KeyedMessage<byte[], byte[]>> messages = new ArrayList<>(); // batch to send
long messageBytes = 0L; // size of messages in the 'messages' list
int nextByte;
try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
long messageStartOffset = in.getBytesConsumed();
// read until we're out of data.
while (!streamFinished) {
nextByte = in.read();
@ -457,107 +509,309 @@ public class PutKafka extends AbstractProcessor {
}
if (data != null) {
final long messageEndOffset = in.getBytesConsumed();
// If the message has no data, ignore it.
if (data.length != 0) {
// either we ran out of data or we reached the end of the message.
// Either way, create the message because it's ready to send.
final KeyedMessage<byte[], byte[]> message;
if (key == null) {
message = new KeyedMessage<>(topic, data);
} else {
message = new KeyedMessage<>(topic, keyBytes, data);
}
// Add the message to the list of messages ready to send. If we've reached our
// threshold of how many we're willing to send (or if we're out of data), go ahead
// and send the whole List.
messages.add(message);
messageBytes += data.length;
if (messageBytes >= maxBufferSize || streamFinished) {
// send the messages, then reset our state.
final Integer partition;
try {
producer.send(messages);
partition = getPartition(context, flowFile, topic);
} catch (final Exception e) {
// we wrap the general exception in ProcessException because we want to separate
// failures in sending messages from general Exceptions that would indicate bugs
// in the Processor. Failure to send a message should be handled appropriately, but
// we don't want to catch the general Exception or RuntimeException in order to catch
// failures from Kafka's Producer.
throw new ProcessException("Failed to send messages to Kafka", e);
messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e);
getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e});
continue;
}
messagesSent.addAndGet(messages.size()); // count number of messages sent
// reset state
messages.clear();
messageBytes = 0;
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data);
final long rangeStart = messageStartOffset;
// We've successfully sent a batch of messages. Keep track of the byte offset in the
// FlowFile of the last successfully sent message. This way, if the messages cannot
// all be successfully sent, we know where to split off the data. This allows us to then
// split off the first X number of bytes and send to 'success' and then split off the rest
// and send them to 'failure'.
lastMessageOffset.set(in.getBytesConsumed());
try {
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(final RecordMetadata metadata, final Exception exception) {
if (exception == null) {
// record was successfully sent.
messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset());
} else {
messageBatch.addFailedRange(rangeStart, messageEndOffset, exception);
}
}
});
messagesSent.incrementAndGet();
} catch (final BufferExhaustedException bee) {
// Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range
messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee);
context.yield();
return;
}
}
// reset BAOS so that we can start a new message.
baos.reset();
data = null;
}
}
// If there are messages left, send them
if (!messages.isEmpty()) {
try {
messagesSent.addAndGet(messages.size()); // add count of messages
producer.send(messages);
} catch (final Exception e) {
throw new ProcessException("Failed to send messages to Kafka", e);
messageStartOffset = in.getBytesConsumed();
}
}
}
}
});
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
messageBatch.setNumMessages(messagesSent.get());
}
}
}
private static class Range {
private final long start;
private final long end;
private final Long kafkaOffset;
public Range(final long start, final long end, final Long kafkaOffset) {
this.start = start;
this.end = end;
this.kafkaOffset = kafkaOffset;
}
public long getStart() {
return start;
}
public long getEnd() {
return end;
}
public Long getKafkaOffset() {
return kafkaOffset;
}
@Override
public String toString() {
return "Range[" + start + "-" + end + "]";
}
}
private class FlowFileMessageBatch {
private final ProcessSession session;
private final FlowFile flowFile;
private final String topic;
private final long startTime = System.nanoTime();
private final List<Range> successfulRanges = new ArrayList<>();
private final List<Range> failedRanges = new ArrayList<>();
private Exception lastFailureReason;
private long numMessages = -1L;
private long completeTime = 0L;
private boolean canceled = false;
public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) {
this.session = session;
this.flowFile = flowFile;
this.topic = topic;
}
public synchronized void cancelOrComplete() {
if (isComplete()) {
completeSession();
return;
}
this.canceled = true;
session.rollback();
successfulRanges.clear();
failedRanges.clear();
}
public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) {
if (canceled) {
return;
}
successfulRanges.add(new Range(start, end, kafkaOffset));
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
public synchronized void addFailedRange(final long start, final long end, final Exception e) {
if (canceled) {
return;
}
failedRanges.add(new Range(start, end, null));
lastFailureReason = e;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private boolean isComplete() {
return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages);
}
public synchronized void setNumMessages(final long msgCount) {
this.numMessages = msgCount;
if (isComplete()) {
activeBatches.remove(this);
completeBatches.add(this);
completeTime = System.nanoTime();
}
}
private Long getMin(final Long a, final Long b) {
if (a == null && b == null) {
return null;
}
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return Math.min(a, b);
}
private Long getMax(final Long a, final Long b) {
if (a == null && b == null) {
return null;
}
if (a == null) {
return b;
}
if (b == null) {
return a;
}
return Math.max(a, b);
}
private void transferRanges(final List<Range> ranges, final Relationship relationship) {
Collections.sort(ranges, new Comparator<Range>() {
@Override
public int compare(final Range o1, final Range o2) {
return Long.compare(o1.getStart(), o2.getStart());
}
});
for (int i = 0; i < ranges.size(); i++) {
Range range = ranges.get(i);
int count = 1;
Long smallestKafkaOffset = range.getKafkaOffset();
Long largestKafkaOffset = range.getKafkaOffset();
while (i + 1 < ranges.size()) {
// Check if the next range in the List continues where this one left off.
final Range nextRange = ranges.get(i + 1);
if (nextRange.getStart() == range.getEnd()) {
// We have two ranges in a row that are contiguous; combine them into a single Range.
range = new Range(range.getStart(), nextRange.getEnd(), null);
smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset());
largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset());
count++;
i++;
} else {
break;
}
}
// Create a FlowFile for this range.
FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart());
if (relationship == REL_SUCCESS) {
session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
session.transfer(child, relationship);
} else {
session.transfer(session.penalize(child), relationship);
}
}
}
private String getTransitUri() {
final List<PartitionInfo> partitions = getProducer().partitionsFor(topic);
if (partitions.isEmpty()) {
return "kafka://unknown-host" + "/topics/" + topic;
}
final PartitionInfo info = partitions.get(0);
final Node leader = info.leader();
final String host = leader.host();
final int port = leader.port();
return "kafka://" + host + ":" + port + "/topics/" + topic;
}
public synchronized void completeSession() {
if (canceled) {
return;
}
if (successfulRanges.isEmpty() && failedRanges.isEmpty()) {
getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile});
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) });
} catch (final ProcessException pe) {
error = true;
session.commit();
return;
}
// There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can
// just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
// 'success' while we send the others to 'failure'.
final long offset = lastMessageOffset.get();
if (offset == 0L) {
// all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() });
if (successfulRanges.isEmpty()) {
getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason});
session.transfer(session.penalize(flowFile), REL_FAILURE);
session.commit();
return;
}
if (failedRanges.isEmpty()) {
final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime);
if (successfulRanges.size() == 1) {
final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset();
final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset));
session.getProvenanceReporter().send(flowFile, getTransitUri(), msg);
} else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset();
long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset();
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into"
+ " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
for (final Range range : successfulRanges) {
smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset());
largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset());
}
session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(session.penalize(failedMessages), REL_FAILURE);
session.getProvenanceReporter().send(flowFile, getTransitUri(),
"Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset);
}
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis});
session.commit();
return;
}
// At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka
// successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success'
// and the failed messages to 'failure'.
transferRanges(successfulRanges, REL_SUCCESS);
transferRanges(failedRanges, REL_FAILURE);
session.remove(flowFile);
session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
}
} finally {
if (error) {
producer.close();
} else {
returnProducer(producer);
getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}",
new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason});
session.commit();
}
}
}
}
}

View File

@ -22,27 +22,33 @@ import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.message.CompressionCodec;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Assert;
import org.junit.Test;
import scala.collection.Seq;
import kafka.common.FailedToSendMessageException;
public class TestPutKafka {
@ -56,24 +62,19 @@ public class TestPutKafka {
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
runner.run();
runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles.
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<byte[]> messages = proc.getProducer().getMessages();
final List<ProducerRecord<byte[], byte[]>> messages = ((MockProducer) proc.getProducer()).getMessages();
assertEquals(11, messages.size());
assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1)));
assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2)));
assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3)));
assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4)));
assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5)));
assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6)));
assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7)));
assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8)));
assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9)));
assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value()));
assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value()));
for (int i = 1; i <= 9; i++) {
assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value()));
}
}
@Test
@ -87,7 +88,7 @@ public class TestPutKafka {
final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
runner.enqueue(text.getBytes());
runner.run();
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
@ -96,7 +97,7 @@ public class TestPutKafka {
@Test
public void testPartialFailure() {
final TestableProcessor proc = new TestableProcessor(2);
final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages.
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
@ -106,7 +107,7 @@ public class TestPutKafka {
final byte[] bytes = "1\n2\n3\n4".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
@ -118,6 +119,39 @@ public class TestPutKafka {
failureFF.assertContentEquals("3\n4");
}
@Test
public void testPartialFailureWithSuccessBeforeAndAfter() {
final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes();
runner.enqueue(bytes);
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 2);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
for (final MockFlowFile successFF : success) {
if ('1' == successFF.toByteArray()[0]) {
successFF.assertContentEquals("1\n2\n");
} else if ('5' == successFF.toByteArray()[0]) {
successFF.assertContentEquals("5\n6");
} else {
Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray()));
}
}
final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
failureFF.assertContentEquals("3\n4\n");
}
@Test
public void testWithEmptyMessages() {
final TestableProcessor proc = new TestableProcessor();
@ -129,16 +163,16 @@ public class TestPutKafka {
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<byte[]> msgs = proc.getProducer().getMessages();
final List<ProducerRecord<byte[], byte[]>> msgs = ((MockProducer) proc.getProducer()).getMessages();
assertEquals(4, msgs.size());
assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
for (int i = 1; i <= 4; i++) {
assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value()));
}
}
@Test
@ -154,14 +188,14 @@ public class TestPutKafka {
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
assertEquals("kafka://topic1", event.getTransitUri());
assertEquals("Sent 4 messages", event.getDetails());
assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
assertTrue(event.getDetails().startsWith("Sent 4 messages"));
}
@Test
@ -175,266 +209,271 @@ public class TestPutKafka {
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.run(2);
final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
assertEquals(1, events.size());
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.SEND, event.getEventType());
assertEquals("kafka://topic1", event.getTransitUri());
assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri());
}
@Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePut() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
public void testRoundRobinAcrossMultipleMessages() {
final TestableProcessor proc = new TestableProcessor();
keyValuePutExecute(runner);
}
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
@Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePutAsync() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
runner.setProperty(PutKafka.KEY, "${kafka.key}");
runner.setProperty(PutKafka.TIMEOUT, "3 secs");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
keyValuePutExecute(runner);
}
private void keyValuePutExecute(final TestRunner runner) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", "test");
attributes.put("kafka.key", "key3");
final byte[] data = "Hello, World, Again! ;)".getBytes();
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue(data, attributes);
runner.enqueue("hello".getBytes());
runner.enqueue("there".getBytes());
runner.enqueue("how are you".getBytes());
runner.enqueue("today".getBytes());
runner.run(5);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
final MockFlowFile mff = mffs.get(0);
assertTrue(Arrays.equals(data, mff.toByteArray()));
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, records.get(i).partition().intValue());
}
assertEquals(1, records.get(3).partition().intValue());
}
@Test
public void testProducerConfigDefault() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING);
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes());
// Check the codec
final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.NoCompressionCodec$);
runner.run(2);
// Check compressed topics
final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(0, compressedTopics.size());
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1, records.get(i).partition().intValue());
}
assertEquals(1, records.get(3).partition().intValue());
}
@Test
public void testProducerConfigAsyncWithCompression() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testUserDefinedPartition() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue());
runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${part}");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
final Map<String, String> attrs = new HashMap<>();
attrs.put("part", "3");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
// Check that the codec is snappy
final CompressionCodec codec = config.compressionCodec();
assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$);
runner.run(2);
// Check compressed topics
final Seq<String> compressedTopics = config.compressedTopics();
assertEquals(3, compressedTopics.size());
assertTrue(compressedTopics.contains("topic01"));
assertTrue(compressedTopics.contains("topic02"));
assertTrue(compressedTopics.contains("topic03"));
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
for (int i = 0; i < 4; i++) {
assertEquals(3, records.get(i).partition().intValue());
}
}
@Test
public void testProducerConfigAsyncQueueThresholds() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testUserDefinedPartitionWithInvalidValue() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535");
runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms");
runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
runner.setProperty(PutKafka.PARTITION, "${part}");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
final Map<String, String> attrs = new HashMap<>();
attrs.put("part", "bogus");
runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs);
// Check that the queue thresholds were properly translated
assertEquals(7000, config.queueBufferingMaxMs());
assertEquals(535, config.queueBufferingMaxMessages());
assertEquals(200, config.queueEnqueueTimeoutMs());
runner.run(2);
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<ProducerRecord<byte[], byte[]>> records = ((MockProducer) proc.getProducer()).getMessages();
// should all be the same partition, regardless of what partition it is.
final int partition = records.get(0).partition().intValue();
for (int i = 0; i < 4; i++) {
assertEquals(partition, records.get(i).partition().intValue());
}
}
@Test
public void testProducerConfigInvalidBatchSize() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
public void testFullBuffer() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200");
runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B");
proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value.
runner.assertNotValid();
runner.enqueue("1\n2\n3\n4\n".getBytes());
runner.run(2);
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n");
runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n");
}
@Test
public void testProducerConfigAsyncDefaultEnqueueTimeout() {
final TestableProcessor processor = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue());
// Do not set QUEUE_ENQUEUE_TIMEOUT
final ProcessContext context = runner.getProcessContext();
final ProducerConfig config = processor.createConfig(context);
// Check that the enqueue timeout defaults to -1
assertEquals(-1, config.queueEnqueueTimeoutMs());
// Check the producer type
final String actualProducerType = config.producerType();
assertEquals("async", actualProducerType);
}
private static class TestableProcessor extends PutKafka {
private MockProducer producer;
private int failAfter = Integer.MAX_VALUE;
public TestableProcessor() {
}
public TestableProcessor(final int failAfter) {
this.failAfter = failAfter;
}
@OnScheduled
public void instantiateProducer(final ProcessContext context) {
producer = new MockProducer(createConfig(context));
producer.setFailAfter(failAfter);
}
@Override
protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
return producer;
}
public MockProducer getProducer() {
return producer;
}
/**
* Exposed for test verification
* Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used
*/
private static class TestableProcessor extends PutKafka {
private final MockProducer producer;
public TestableProcessor() {
this(null);
}
public TestableProcessor(final Integer failAfter) {
this(failAfter, null);
}
public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) {
producer = new MockProducer();
producer.setFailAfter(failAfter);
producer.setStopFailingAfter(stopFailingAfter);
}
@Override
public ProducerConfig createConfig(final ProcessContext context) {
return super.createConfig(context);
protected Producer<byte[], byte[]> getProducer() {
return producer;
}
public void setMaxQueueSize(final long bytes) {
producer.setMaxQueueSize(bytes);
}
}
private static class MockProducer extends Producer<byte[], byte[]> {
/**
* We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied
* Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it
* to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor,
* this doesn't allow us to test failure conditions adequately.
*/
private static class MockProducer implements Producer<byte[], byte[]> {
private int sendCount = 0;
private int failAfter = Integer.MAX_VALUE;
private Integer failAfter;
private Integer stopFailingAfter;
private long queueSize = 0L;
private long maxQueueSize = Long.MAX_VALUE;
private final List<byte[]> messages = new ArrayList<>();
private final List<ProducerRecord<byte[], byte[]>> messages = new ArrayList<>();
public MockProducer(final ProducerConfig config) {
super(config);
public MockProducer() {
}
@Override
public void send(final KeyedMessage<byte[], byte[]> message) {
if (++sendCount > failAfter) {
throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
} else {
messages.add(message.message());
}
public void setMaxQueueSize(final long bytes) {
this.maxQueueSize = bytes;
}
public List<byte[]> getMessages() {
public List<ProducerRecord<byte[], byte[]>> getMessages() {
return messages;
}
@Override
public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
for (final KeyedMessage<byte[], byte[]> msg : messages) {
send(msg);
}
}
public void setFailAfter(final int successCount) {
public void setFailAfter(final Integer successCount) {
failAfter = successCount;
}
public void setStopFailingAfter(final Integer stopFailingAfter) {
this.stopFailingAfter = stopFailingAfter;
}
@Override
public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record) {
return send(record, null);
}
@Override
public Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
sendCount++;
final ByteArraySerializer serializer = new ByteArraySerializer();
final int keyBytes = serializer.serialize(record.topic(), record.key()).length;
final int valueBytes = serializer.serialize(record.topic(), record.value()).length;
if (maxQueueSize - queueSize < keyBytes + valueBytes) {
throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes));
}
queueSize += keyBytes + valueBytes;
if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) {
final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
callback.onCompletion(null, e);
} else {
messages.add(record);
final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L);
callback.onCompletion(meta, null);
}
// we don't actually look at the Future in the processor, so we can just return null
return null;
}
@Override
public List<PartitionInfo> partitionsFor(String topic) {
final Node leader = new Node(1, "localhost", 1111);
final Node node2 = new Node(2, "localhost-2", 2222);
final Node node3 = new Node(3, "localhost-3", 3333);
final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]);
final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]);
final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]);
final List<PartitionInfo> infos = new ArrayList<>(3);
infos.add(partInfo1);
infos.add(partInfo2);
infos.add(partInfo3);
return infos;
}
@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.emptyMap();
}
@Override
public void close() {
}
}
}