From 22de23baa6ae7318574ccb112d03daf8f8dfa10e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 3 Nov 2015 16:17:32 -0500 Subject: [PATCH] NIFI-1097: Rewrite PutKafka to use the new producer api --- .../nifi-kafka-processors/pom.xml | 5 + .../nifi/processors/kafka/PutKafka.java | 720 ++++++++++++------ .../nifi/processors/kafka/TestPutKafka.java | 445 ++++++----- 3 files changed, 734 insertions(+), 436 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index c2e86fec07..1a8dc9d048 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -34,6 +34,11 @@ org.apache.nifi nifi-utils + + org.apache.kafka + kafka-clients + 0.8.2.2 + org.apache.kafka kafka_2.9.1 diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 09025a4c96..ea7f7bbb11 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -21,30 +21,47 @@ import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Properties; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Pattern; +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; @@ -56,21 +73,20 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import scala.actors.threadpool.Arrays; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) -@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") -public class PutKafka extends AbstractProcessor { +@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " + + "user-specified delimiter, such as a new-line.") +@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed +public class PutKafka extends AbstractSessionFactoryProcessor { private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to" + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" @@ -79,16 +95,6 @@ public class PutKafka extends AbstractProcessor { + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + " in data loss."); - /** - * AllowableValue for a Producer Type that synchronously sends messages to Kafka - */ - public static final AllowableValue PRODUCTER_TYPE_SYNCHRONOUS = new AllowableValue("sync", "Synchronous", "Send FlowFiles to Kafka immediately."); - - /** - * AllowableValue for a Producer Type that asynchronously sends messages to Kafka - */ - public static final AllowableValue PRODUCTER_TYPE_ASYNCHRONOUS = new AllowableValue("async", "Asynchronous", "Batch messages before sending them to Kafka." - + " While this will improve throughput, it opens the possibility that a failure on the client machine will drop unsent data."); /** * AllowableValue for sending messages to Kafka without compression @@ -105,6 +111,13 @@ public class PutKafka extends AbstractProcessor { */ public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary."); + static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", + "Messages will be assigned to random partitions."); + static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", + "The property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition."); + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() .name("Known Brokers") @@ -120,6 +133,21 @@ public class PutKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() + .name("Partition Strategy") + .description("Specifies how messages should be partitioned when sent to Kafka") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) + .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) + .required(true) + .build(); + public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() + .name("Partition") + .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. " + + "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() .name("Kafka Key") .description("The Key to use for the Message") @@ -140,7 +168,10 @@ public class PutKafka extends AbstractProcessor { .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + "If not specified, the entire content of the FlowFile will be used as a single message. " + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") + + "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile " + + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those " + + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' " + + "relationship.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) @@ -151,6 +182,13 @@ public class PutKafka extends AbstractProcessor { .required(true) .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) .expressionLanguageSupported(false) + .defaultValue("5 MB") + .build(); + static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() + .name("Max Record Size") + .description("The maximum size that any individual record can be.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .required(true) .defaultValue("1 MB") .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() @@ -168,20 +206,10 @@ public class PutKafka extends AbstractProcessor { .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(false) .build(); - public static final PropertyDescriptor PRODUCER_TYPE = new PropertyDescriptor.Builder() - .name("Producer Type") - .description("This parameter specifies whether the messages are sent asynchronously in a background thread.") - .required(true) - .allowableValues(PRODUCTER_TYPE_SYNCHRONOUS, PRODUCTER_TYPE_ASYNCHRONOUS) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue(PRODUCTER_TYPE_SYNCHRONOUS.getValue()) - .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() .name("Async Batch Size") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The number of messages to send in one batch when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode." - + " The producer will wait until either this number of messages are ready" + .displayName("Batch Size") + .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready" + " to send or \"Queue Buffering Max Time\" is reached.") .required(true) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) @@ -189,35 +217,13 @@ public class PutKafka extends AbstractProcessor { .build(); public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() .name("Queue Buffering Max Time") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " Maximum time to buffer data when using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode. For example a setting of 100 ms" - + " will try to batch together 100ms of messages to send at once. This will improve" + .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" + + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" + " throughput but adds message delivery latency due to the buffering.") .required(true) .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .defaultValue("5 secs") .build(); - public static final PropertyDescriptor QUEUE_BUFFERING_MAX_MESSAGES = new PropertyDescriptor.Builder() - .name("Queue Buffer Max Count") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The maximum number of unsent messages that can be queued up in the producer when" - + " using " + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode before either the producer must be blocked or data must be dropped.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000") - .build(); - public static final PropertyDescriptor QUEUE_ENQUEUE_TIMEOUT = new PropertyDescriptor.Builder() - .name("Queue Enqueue Timeout") - .description("Used only if Producer Type is set to \"" + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + "\"." - + " The amount of time to block before dropping messages when running in " - + PRODUCTER_TYPE_ASYNCHRONOUS.getDisplayName() + " mode" - + " and the buffer has reached the \"Queue Buffer Max Count\". If set to 0, events will" - + " be enqueued immediately or dropped if the queue is full (the producer send call will" - + " never block). If not set, the producer will block indefinitely and never willingly" - + " drop a send.") - .required(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() .name("Compression Codec") .description("This parameter allows you to specify the compression codec for all" @@ -227,16 +233,6 @@ public class PutKafka extends AbstractProcessor { .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) .defaultValue(COMPRESSION_CODEC_NONE.getValue()) .build(); - public static final PropertyDescriptor COMPRESSED_TOPICS = new PropertyDescriptor.Builder() - .name("Compressed Topics") - .description("This parameter allows you to set whether compression should be turned on" - + " for particular topics. If the compression codec is anything other than" - + " \"" + COMPRESSION_CODEC_NONE.getDisplayName() + "\", enable compression only for specified topics if any." - + " If the list of compressed topics is empty, then enable the specified" - + " compression codec for all topics. If the compression codec is " + COMPRESSION_CODEC_NONE.getDisplayName() + "," - + " compression is disabled for all topics") - .required(false) - .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -247,7 +243,13 @@ public class PutKafka extends AbstractProcessor { .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .build(); - private final BlockingQueue> producers = new LinkedBlockingQueue<>(); + private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); + private final BlockingQueue completeBatches = new LinkedBlockingQueue<>(); + private final Set activeBatches = Collections.synchronizedSet(new HashSet()); + + private final ConcurrentMap partitionIndexMap = new ConcurrentHashMap<>(); + + private volatile Producer producer; @Override protected List getSupportedPropertyDescriptors() { @@ -259,36 +261,21 @@ public class PutKafka extends AbstractProcessor { final List props = new ArrayList<>(); props.add(SEED_BROKERS); props.add(TOPIC); + props.add(PARTITION_STRATEGY); + props.add(PARTITION); props.add(KEY); props.add(DELIVERY_GUARANTEE); props.add(MESSAGE_DELIMITER); props.add(MAX_BUFFER_SIZE); + props.add(MAX_RECORD_SIZE); props.add(TIMEOUT); - props.add(PRODUCER_TYPE); props.add(BATCH_NUM_MESSAGES); - props.add(QUEUE_BUFFERING_MAX_MESSAGES); props.add(QUEUE_BUFFERING_MAX); - props.add(QUEUE_ENQUEUE_TIMEOUT); props.add(COMPRESSION_CODEC); - props.add(COMPRESSED_TOPICS); props.add(clientName); return props; } - @Override - public Collection customValidate(final ValidationContext context) { - final List errors = new ArrayList<>(super.customValidate(context)); - - final Integer batchMessages = context.getProperty(BATCH_NUM_MESSAGES).asInteger(); - final Integer bufferMaxMessages = context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).asInteger(); - - if (batchMessages > bufferMaxMessages) { - errors.add(new ValidationResult.Builder().subject("Batch Size, Queue Buffer").valid(false) - .explanation("Batch Size (" + batchMessages + ") must be equal to or less than the Queue Buffer Max Count (" + bufferMaxMessages + ")").build()); - } - - return errors; - } @Override public Set getRelationships() { @@ -298,71 +285,131 @@ public class PutKafka extends AbstractProcessor { return relationships; } - @OnStopped - public void closeProducers() { - Producer producer; + @Override + protected Collection customValidate(final ValidationContext validationContext) { + final List results = new ArrayList<>(); - while ((producer = producers.poll()) != null) { + final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) { + results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation( + "The property must be set when configured to use the User-Defined Partitioning Strategy").build()); + } + + return results; + } + + protected Producer getProducer() { + return producer; + } + + @OnStopped + public void cleanup() { + final Producer producer = getProducer(); + if (producer != null) { producer.close(); } + + for (final FlowFileMessageBatch batch : activeBatches) { + batch.cancelOrComplete(); + } } - protected ProducerConfig createConfig(final ProcessContext context) { + @OnScheduled + public void createProducer(final ProcessContext context) { + producer = new KafkaProducer(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer()); + } + + protected int getActiveMessageBatchCount() { + return activeBatches.size(); + } + + protected int getCompleteMessageBatchCount() { + return completeBatches.size(); + } + + protected Properties createConfig(final ProcessContext context) { final String brokers = context.getProperty(SEED_BROKERS).getValue(); final Properties properties = new Properties(); - properties.setProperty("metadata.broker.list", brokers); - properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); + properties.setProperty("bootstrap.servers", brokers); + properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); - properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); - properties.setProperty("message.send.max.retries", "1"); - properties.setProperty("producer.type", context.getProperty(PRODUCER_TYPE).getValue()); - properties.setProperty("batch.num.messages", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + properties.setProperty("timeout.ms", timeout); + properties.setProperty("metadata.fetch.timeout.ms", timeout); + + properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); + + final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); + properties.setProperty("buffer.memory", String.valueOf(maxBufferSize)); + + final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); + properties.setProperty("compression.type", compressionCodec); final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); if (queueBufferingMillis != null) { - properties.setProperty("queue.buffering.max.ms", String.valueOf(queueBufferingMillis)); - } - properties.setProperty("queue.buffering.max.messages", context.getProperty(QUEUE_BUFFERING_MAX_MESSAGES).getValue()); - - final Long queueEnqueueTimeoutMillis = context.getProperty(QUEUE_ENQUEUE_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS); - if (queueEnqueueTimeoutMillis != null) { - properties.setProperty("queue.enqueue.timeout.ms", String.valueOf(queueEnqueueTimeoutMillis)); + properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis)); } - final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); - properties.setProperty("compression.codec", compressionCodec); + properties.setProperty("retries", "0"); + properties.setProperty("block.on.buffer.full", "false"); - final String compressedTopics = context.getProperty(COMPRESSED_TOPICS).getValue(); - if (compressedTopics != null) { - properties.setProperty("compressed.topics", compressedTopics); + return properties; + } + + private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) { + final long unnormalizedIndex; + + final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { + AtomicLong partitionIndex = partitionIndexMap.get(topic); + if (partitionIndex == null) { + partitionIndex = new AtomicLong(0L); + final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex); + if (existing != null) { + partitionIndex = existing; + } + } + + unnormalizedIndex = partitionIndex.getAndIncrement(); + } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { + return null; + } else { + if (context.getProperty(PARTITION).isSet()) { + final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + + if (NUMBER_PATTERN.matcher(partitionValue).matches()) { + // Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions. + unnormalizedIndex = Long.parseLong(partitionValue) - 1; + } else { + unnormalizedIndex = partitionValue.hashCode(); + } + } else { + return null; + } } - return new ProducerConfig(properties); - } - - protected Producer createProducer(final ProcessContext context) { - return new Producer<>(createConfig(context)); - } - - private Producer borrowProducer(final ProcessContext context) { - final Producer producer = producers.poll(); - return producer == null ? createProducer(context) : producer; - } - - private void returnProducer(final Producer producer) { - producers.offer(producer); + final Producer producer = getProducer(); + final List partitionInfos = producer.partitionsFor(topic); + final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size()); + return partitionInfos.get(partitionIdx).partition(); } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { + FlowFileMessageBatch batch; + while ((batch = completeBatches.poll()) != null) { + batch.completeSession(); + } + + final ProcessSession session = sessionFactory.createSession(); final FlowFile flowFile = session.get(); if (flowFile == null) { return; } - final long start = System.nanoTime(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); @@ -371,8 +418,7 @@ public class PutKafka extends AbstractProcessor { delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); } - final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); - final Producer producer = borrowProducer(context); + final Producer producer = getProducer(); if (delimiter == null) { // Send the entire FlowFile as a single message. @@ -384,31 +430,38 @@ public class PutKafka extends AbstractProcessor { } }); - boolean error = false; + final Integer partition; try { - final KeyedMessage message; - if (key == null) { - message = new KeyedMessage<>(topic, value); - } else { - message = new KeyedMessage<>(topic, keyBytes, value); - } - - producer.send(message); - final long nanos = System.nanoTime() - start; - - session.getProvenanceReporter().send(flowFile, "kafka://" + topic); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] { flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); + partition = getPartition(context, flowFile, topic); } catch (final Exception e) { - getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] { flowFile, e }); + getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); session.transfer(session.penalize(flowFile), REL_FAILURE); - error = true; - } finally { - if (error) { - producer.close(); - } else { - returnProducer(producer); - } + session.commit(); + return; + } + + final ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value); + + final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); + messageBatch.setNumMessages(1); + activeBatches.add(messageBatch); + + try { + producer.send(producerRecord, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + // record was successfully sent. + messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset()); + } else { + messageBatch.addFailedRange(0L, flowFile.getSize(), exception); + } + } + }); + } catch (final BufferExhaustedException bee) { + messageBatch.addFailedRange(0L, flowFile.getSize(), bee); + context.yield(); + return; } } else { final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); @@ -418,9 +471,9 @@ public class PutKafka extends AbstractProcessor { // the stream of bytes in the FlowFile final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); - boolean error = false; - final LongHolder lastMessageOffset = new LongHolder(0L); final LongHolder messagesSent = new LongHolder(0L); + final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); + activeBatches.add(messageBatch); try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { session.read(flowFile, new InputStreamCallback() { @@ -430,13 +483,12 @@ public class PutKafka extends AbstractProcessor { boolean streamFinished = false; - final List> messages = new ArrayList<>(); // batch to send - long messageBytes = 0L; // size of messages in the 'messages' list - int nextByte; try (final InputStream bufferedIn = new BufferedInputStream(rawIn); final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + long messageStartOffset = in.getBytesConsumed(); + // read until we're out of data. while (!streamFinished) { nextByte = in.read(); @@ -457,107 +509,309 @@ public class PutKafka extends AbstractProcessor { } if (data != null) { + final long messageEndOffset = in.getBytesConsumed(); + // If the message has no data, ignore it. if (data.length != 0) { - // either we ran out of data or we reached the end of the message. - // Either way, create the message because it's ready to send. - final KeyedMessage message; - if (key == null) { - message = new KeyedMessage<>(topic, data); - } else { - message = new KeyedMessage<>(topic, keyBytes, data); + final Integer partition; + try { + partition = getPartition(context, flowFile, topic); + } catch (final Exception e) { + messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e); + getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); + continue; } - // Add the message to the list of messages ready to send. If we've reached our - // threshold of how many we're willing to send (or if we're out of data), go ahead - // and send the whole List. - messages.add(message); - messageBytes += data.length; - if (messageBytes >= maxBufferSize || streamFinished) { - // send the messages, then reset our state. - try { - producer.send(messages); - } catch (final Exception e) { - // we wrap the general exception in ProcessException because we want to separate - // failures in sending messages from general Exceptions that would indicate bugs - // in the Processor. Failure to send a message should be handled appropriately, but - // we don't want to catch the general Exception or RuntimeException in order to catch - // failures from Kafka's Producer. - throw new ProcessException("Failed to send messages to Kafka", e); - } - messagesSent.addAndGet(messages.size()); // count number of messages sent + final ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data); + final long rangeStart = messageStartOffset; - // reset state - messages.clear(); - messageBytes = 0; + try { + producer.send(producerRecord, new Callback() { + @Override + public void onCompletion(final RecordMetadata metadata, final Exception exception) { + if (exception == null) { + // record was successfully sent. + messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset()); + } else { + messageBatch.addFailedRange(rangeStart, messageEndOffset, exception); + } + } + }); - // We've successfully sent a batch of messages. Keep track of the byte offset in the - // FlowFile of the last successfully sent message. This way, if the messages cannot - // all be successfully sent, we know where to split off the data. This allows us to then - // split off the first X number of bytes and send to 'success' and then split off the rest - // and send them to 'failure'. - lastMessageOffset.set(in.getBytesConsumed()); + messagesSent.incrementAndGet(); + } catch (final BufferExhaustedException bee) { + // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range + messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee); + context.yield(); + return; } + } + // reset BAOS so that we can start a new message. baos.reset(); data = null; - - } - } - - // If there are messages left, send them - if (!messages.isEmpty()) { - try { - messagesSent.addAndGet(messages.size()); // add count of messages - producer.send(messages); - } catch (final Exception e) { - throw new ProcessException("Failed to send messages to Kafka", e); + messageStartOffset = in.getBytesConsumed(); } } } } }); - final long nanos = System.nanoTime() - start; - session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] { messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos) }); - } catch (final ProcessException pe) { - error = true; - - // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can - // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to - // 'success' while we send the others to 'failure'. - final long offset = lastMessageOffset.get(); - if (offset == 0L) { - // all of the messages failed to send. Route FlowFile to failure - getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] { flowFile, pe.getCause() }); - session.transfer(session.penalize(flowFile), REL_FAILURE); - } else { - // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. - final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); - final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); - - getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" - + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { - messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); - - session.transfer(successfulMessages, REL_SUCCESS); - session.transfer(session.penalize(failedMessages), REL_FAILURE); - session.remove(flowFile); - session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); - } - } finally { - if (error) { - producer.close(); - } else { - returnProducer(producer); - } + messageBatch.setNumMessages(messagesSent.get()); } - } } + + private static class Range { + private final long start; + private final long end; + private final Long kafkaOffset; + + public Range(final long start, final long end, final Long kafkaOffset) { + this.start = start; + this.end = end; + this.kafkaOffset = kafkaOffset; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public Long getKafkaOffset() { + return kafkaOffset; + } + + @Override + public String toString() { + return "Range[" + start + "-" + end + "]"; + } + } + + private class FlowFileMessageBatch { + private final ProcessSession session; + private final FlowFile flowFile; + private final String topic; + private final long startTime = System.nanoTime(); + + private final List successfulRanges = new ArrayList<>(); + private final List failedRanges = new ArrayList<>(); + + private Exception lastFailureReason; + private long numMessages = -1L; + private long completeTime = 0L; + private boolean canceled = false; + + public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) { + this.session = session; + this.flowFile = flowFile; + this.topic = topic; + } + + public synchronized void cancelOrComplete() { + if (isComplete()) { + completeSession(); + return; + } + + this.canceled = true; + + session.rollback(); + successfulRanges.clear(); + failedRanges.clear(); + } + + public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) { + if (canceled) { + return; + } + + successfulRanges.add(new Range(start, end, kafkaOffset)); + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + public synchronized void addFailedRange(final long start, final long end, final Exception e) { + if (canceled) { + return; + } + + failedRanges.add(new Range(start, end, null)); + lastFailureReason = e; + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + private boolean isComplete() { + return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages); + } + + public synchronized void setNumMessages(final long msgCount) { + this.numMessages = msgCount; + + if (isComplete()) { + activeBatches.remove(this); + completeBatches.add(this); + completeTime = System.nanoTime(); + } + } + + private Long getMin(final Long a, final Long b) { + if (a == null && b == null) { + return null; + } + + if (a == null) { + return b; + } + + if (b == null) { + return a; + } + + return Math.min(a, b); + } + + private Long getMax(final Long a, final Long b) { + if (a == null && b == null) { + return null; + } + + if (a == null) { + return b; + } + + if (b == null) { + return a; + } + + return Math.max(a, b); + } + + private void transferRanges(final List ranges, final Relationship relationship) { + Collections.sort(ranges, new Comparator() { + @Override + public int compare(final Range o1, final Range o2) { + return Long.compare(o1.getStart(), o2.getStart()); + } + }); + + for (int i = 0; i < ranges.size(); i++) { + Range range = ranges.get(i); + int count = 1; + Long smallestKafkaOffset = range.getKafkaOffset(); + Long largestKafkaOffset = range.getKafkaOffset(); + + while (i + 1 < ranges.size()) { + // Check if the next range in the List continues where this one left off. + final Range nextRange = ranges.get(i + 1); + + if (nextRange.getStart() == range.getEnd()) { + // We have two ranges in a row that are contiguous; combine them into a single Range. + range = new Range(range.getStart(), nextRange.getEnd(), null); + + smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset()); + largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset()); + count++; + i++; + } else { + break; + } + } + + // Create a FlowFile for this range. + FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart()); + if (relationship == REL_SUCCESS) { + session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); + session.transfer(child, relationship); + } else { + session.transfer(session.penalize(child), relationship); + } + } + } + + private String getTransitUri() { + final List partitions = getProducer().partitionsFor(topic); + if (partitions.isEmpty()) { + return "kafka://unknown-host" + "/topics/" + topic; + } + + final PartitionInfo info = partitions.get(0); + final Node leader = info.leader(); + final String host = leader.host(); + final int port = leader.port(); + + return "kafka://" + host + ":" + port + "/topics/" + topic; + } + + public synchronized void completeSession() { + if (canceled) { + return; + } + + if (successfulRanges.isEmpty() && failedRanges.isEmpty()) { + getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + return; + } + + if (successfulRanges.isEmpty()) { + getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason}); + session.transfer(session.penalize(flowFile), REL_FAILURE); + session.commit(); + return; + } + + if (failedRanges.isEmpty()) { + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime); + + if (successfulRanges.size() == 1) { + final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset(); + final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset)); + session.getProvenanceReporter().send(flowFile, getTransitUri(), msg); + } else { + long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); + long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); + + for (final Range range : successfulRanges) { + smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset()); + largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset()); + } + + session.getProvenanceReporter().send(flowFile, getTransitUri(), + "Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); + } + + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis}); + session.commit(); + return; + } + + // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka + // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success' + // and the failed messages to 'failure'. + transferRanges(successfulRanges, REL_SUCCESS); + transferRanges(failedRanges, REL_FAILURE); + session.remove(flowFile); + getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}", + new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason}); + session.commit(); + } + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 750d40691e..17d1cc831a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -22,27 +22,33 @@ import static org.junit.Assert.assertTrue; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; -import kafka.common.FailedToSendMessageException; -import kafka.javaapi.producer.Producer; -import kafka.message.CompressionCodec; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.processor.ProcessContext; +import org.apache.kafka.clients.producer.BufferExhaustedException; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Ignore; +import org.junit.Assert; import org.junit.Test; -import scala.collection.Seq; +import kafka.common.FailedToSendMessageException; + public class TestPutKafka { @@ -56,24 +62,19 @@ public class TestPutKafka { runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); - runner.run(); + runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles. runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - final List messages = proc.getProducer().getMessages(); + final List> messages = ((MockProducer) proc.getProducer()).getMessages(); assertEquals(11, messages.size()); - assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0))); - assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1))); - assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2))); - assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3))); - assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4))); - assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5))); - assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6))); - assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7))); - assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8))); - assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9))); - assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10))); + assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value())); + assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value())); + + for (int i = 1; i <= 9; i++) { + assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value())); + } } @Test @@ -87,7 +88,7 @@ public class TestPutKafka { final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; runner.enqueue(text.getBytes()); - runner.run(); + runner.run(2); runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); @@ -96,7 +97,7 @@ public class TestPutKafka { @Test public void testPartialFailure() { - final TestableProcessor proc = new TestableProcessor(2); + final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages. final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); @@ -106,7 +107,7 @@ public class TestPutKafka { final byte[] bytes = "1\n2\n3\n4".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); runner.assertTransferCount(PutKafka.REL_FAILURE, 1); @@ -118,6 +119,39 @@ public class TestPutKafka { failureFF.assertContentEquals("3\n4"); } + @Test + public void testPartialFailureWithSuccessBeforeAndAfter() { + final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4 + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); + + final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes(); + runner.enqueue(bytes); + runner.run(2); + + runner.assertTransferCount(PutKafka.REL_SUCCESS, 2); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + final List success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); + for (final MockFlowFile successFF : success) { + if ('1' == successFF.toByteArray()[0]) { + successFF.assertContentEquals("1\n2\n"); + } else if ('5' == successFF.toByteArray()[0]) { + successFF.assertContentEquals("5\n6"); + } else { + Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray())); + } + } + + final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + failureFF.assertContentEquals("3\n4\n"); + } + + @Test public void testWithEmptyMessages() { final TestableProcessor proc = new TestableProcessor(); @@ -129,16 +163,16 @@ public class TestPutKafka { final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - final List msgs = proc.getProducer().getMessages(); + final List> msgs = ((MockProducer) proc.getProducer()).getMessages(); assertEquals(4, msgs.size()); - assertTrue(Arrays.equals("1".getBytes(), msgs.get(0))); - assertTrue(Arrays.equals("2".getBytes(), msgs.get(1))); - assertTrue(Arrays.equals("3".getBytes(), msgs.get(2))); - assertTrue(Arrays.equals("4".getBytes(), msgs.get(3))); + + for (int i = 1; i <= 4; i++) { + assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value())); + } } @Test @@ -154,14 +188,14 @@ public class TestPutKafka { final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); final List events = runner.getProvenanceEvents(); assertEquals(1, events.size()); final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://topic1", event.getTransitUri()); - assertEquals("Sent 4 messages", event.getDetails()); + assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); + assertTrue(event.getDetails().startsWith("Sent 4 messages")); } @Test @@ -175,265 +209,270 @@ public class TestPutKafka { final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(); + runner.run(2); final List events = runner.getProvenanceEvents(); assertEquals(1, events.size()); final ProvenanceEventRecord event = events.get(0); assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://topic1", event.getTransitUri()); + assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); } @Test - @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") - public void testKeyValuePut() { - final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); - runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); - runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); - runner.setProperty(PutKafka.KEY, "${kafka.key}"); - runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + public void testRoundRobinAcrossMultipleMessages() { + final TestableProcessor proc = new TestableProcessor(); - keyValuePutExecute(runner); - } + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); - @Test - @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") - public void testKeyValuePutAsync() { - final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); - runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); - runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); - runner.setProperty(PutKafka.KEY, "${kafka.key}"); - runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); - - keyValuePutExecute(runner); - } - - private void keyValuePutExecute(final TestRunner runner) { - final Map attributes = new HashMap<>(); - attributes.put("kafka.topic", "test"); - attributes.put("kafka.key", "key3"); - - final byte[] data = "Hello, World, Again! ;)".getBytes(); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); + runner.enqueue("hello".getBytes()); + runner.enqueue("there".getBytes()); + runner.enqueue("how are you".getBytes()); + runner.enqueue("today".getBytes()); runner.run(5); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); - final List mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); - final MockFlowFile mff = mffs.get(0); - assertTrue(Arrays.equals(data, mff.toByteArray())); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, records.get(i).partition().intValue()); + } + + assertEquals(1, records.get(3).partition().intValue()); } @Test - public void testProducerConfigDefault() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); + runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes()); - // Check the codec - final CompressionCodec codec = config.compressionCodec(); - assertTrue(codec instanceof kafka.message.NoCompressionCodec$); + runner.run(2); - // Check compressed topics - final Seq compressedTopics = config.compressedTopics(); - assertEquals(0, compressedTopics.size()); + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals(PutKafka.PRODUCER_TYPE.getDefaultValue(), actualProducerType); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, records.get(i).partition().intValue()); + } + assertEquals(1, records.get(3).partition().intValue()); } + @Test - public void testProducerConfigAsyncWithCompression() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testUserDefinedPartition() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.COMPRESSION_CODEC, PutKafka.COMPRESSION_CODEC_SNAPPY.getValue()); - runner.setProperty(PutKafka.COMPRESSED_TOPICS, "topic01,topic02,topic03"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); + runner.setProperty(PutKafka.PARTITION, "${part}"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); + final Map attrs = new HashMap<>(); + attrs.put("part", "3"); + runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - // Check that the codec is snappy - final CompressionCodec codec = config.compressionCodec(); - assertTrue(codec instanceof kafka.message.SnappyCompressionCodec$); + runner.run(2); - // Check compressed topics - final Seq compressedTopics = config.compressedTopics(); - assertEquals(3, compressedTopics.size()); - assertTrue(compressedTopics.contains("topic01")); - assertTrue(compressedTopics.contains("topic02")); - assertTrue(compressedTopics.contains("topic03")); - - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals("async", actualProducerType); + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + for (int i = 0; i < 4; i++) { + assertEquals(3, records.get(i).partition().intValue()); + } } + + @Test - public void testProducerConfigAsyncQueueThresholds() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testUserDefinedPartitionWithInvalidValue() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX, "7 secs"); - runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "535"); - runner.setProperty(PutKafka.QUEUE_ENQUEUE_TIMEOUT, "200 ms"); + runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); + runner.setProperty(PutKafka.PARTITION, "${part}"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); + final Map attrs = new HashMap<>(); + attrs.put("part", "bogus"); + runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - // Check that the queue thresholds were properly translated - assertEquals(7000, config.queueBufferingMaxMs()); - assertEquals(535, config.queueBufferingMaxMessages()); - assertEquals(200, config.queueEnqueueTimeoutMs()); + runner.run(2); - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals("async", actualProducerType); + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + final List> records = ((MockProducer) proc.getProducer()).getMessages(); + // should all be the same partition, regardless of what partition it is. + final int partition = records.get(0).partition().intValue(); + + for (int i = 0; i < 4; i++) { + assertEquals(partition, records.get(i).partition().intValue()); + } } + @Test - public void testProducerConfigInvalidBatchSize() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); + public void testFullBuffer() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(PutKafka.TOPIC, "topic1"); runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - runner.setProperty(PutKafka.BATCH_NUM_MESSAGES, "200"); - runner.setProperty(PutKafka.QUEUE_BUFFERING_MAX_MESSAGES, "100"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B"); + proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value. - runner.assertNotValid(); + runner.enqueue("1\n2\n3\n4\n".getBytes()); + runner.run(2); + runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n"); + runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n"); } - @Test - public void testProducerConfigAsyncDefaultEnqueueTimeout() { - - final TestableProcessor processor = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(processor); - - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.PRODUCER_TYPE, PutKafka.PRODUCTER_TYPE_ASYNCHRONOUS.getValue()); - // Do not set QUEUE_ENQUEUE_TIMEOUT - - final ProcessContext context = runner.getProcessContext(); - final ProducerConfig config = processor.createConfig(context); - - // Check that the enqueue timeout defaults to -1 - assertEquals(-1, config.queueEnqueueTimeoutMs()); - - // Check the producer type - final String actualProducerType = config.producerType(); - assertEquals("async", actualProducerType); - - } + /** + * Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used + */ private static class TestableProcessor extends PutKafka { - - private MockProducer producer; - private int failAfter = Integer.MAX_VALUE; + private final MockProducer producer; public TestableProcessor() { + this(null); } - public TestableProcessor(final int failAfter) { - this.failAfter = failAfter; + public TestableProcessor(final Integer failAfter) { + this(failAfter, null); } - @OnScheduled - public void instantiateProducer(final ProcessContext context) { - producer = new MockProducer(createConfig(context)); + public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) { + producer = new MockProducer(); producer.setFailAfter(failAfter); + producer.setStopFailingAfter(stopFailingAfter); } @Override - protected Producer createProducer(final ProcessContext context) { + protected Producer getProducer() { return producer; } - public MockProducer getProducer() { - return producer; - } - - /** - * Exposed for test verification - */ - @Override - public ProducerConfig createConfig(final ProcessContext context) { - return super.createConfig(context); + public void setMaxQueueSize(final long bytes) { + producer.setMaxQueueSize(bytes); } } - private static class MockProducer extends Producer { + + /** + * We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied + * Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it + * to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor, + * this doesn't allow us to test failure conditions adequately. + */ + private static class MockProducer implements Producer { private int sendCount = 0; - private int failAfter = Integer.MAX_VALUE; + private Integer failAfter; + private Integer stopFailingAfter; + private long queueSize = 0L; + private long maxQueueSize = Long.MAX_VALUE; - private final List messages = new ArrayList<>(); + private final List> messages = new ArrayList<>(); - public MockProducer(final ProducerConfig config) { - super(config); + public MockProducer() { } - @Override - public void send(final KeyedMessage message) { - if (++sendCount > failAfter) { - throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); - } else { - messages.add(message.message()); - } + public void setMaxQueueSize(final long bytes) { + this.maxQueueSize = bytes; } - public List getMessages() { + public List> getMessages() { return messages; } - @Override - public void send(final List> messages) { - for (final KeyedMessage msg : messages) { - send(msg); - } + public void setFailAfter(final Integer successCount) { + failAfter = successCount; } - public void setFailAfter(final int successCount) { - failAfter = successCount; + public void setStopFailingAfter(final Integer stopFailingAfter) { + this.stopFailingAfter = stopFailingAfter; + } + + @Override + public Future send(final ProducerRecord record) { + return send(record, null); + } + + @Override + public Future send(final ProducerRecord record, final Callback callback) { + sendCount++; + + final ByteArraySerializer serializer = new ByteArraySerializer(); + final int keyBytes = serializer.serialize(record.topic(), record.key()).length; + final int valueBytes = serializer.serialize(record.topic(), record.value()).length; + if (maxQueueSize - queueSize < keyBytes + valueBytes) { + throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes)); + } + + queueSize += keyBytes + valueBytes; + + if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) { + final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); + callback.onCompletion(null, e); + } else { + messages.add(record); + final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L); + callback.onCompletion(meta, null); + } + + // we don't actually look at the Future in the processor, so we can just return null + return null; + } + + @Override + public List partitionsFor(String topic) { + final Node leader = new Node(1, "localhost", 1111); + final Node node2 = new Node(2, "localhost-2", 2222); + final Node node3 = new Node(3, "localhost-3", 3333); + + final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]); + final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]); + final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]); + + final List infos = new ArrayList<>(3); + infos.add(partInfo1); + infos.add(partInfo2); + infos.add(partInfo3); + return infos; + } + + @Override + public Map metrics() { + return Collections.emptyMap(); + } + + @Override + public void close() { } }