From e0e00ff282510904e2705616164280cbd670a7d9 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 22 Mar 2016 10:50:07 -0400 Subject: [PATCH] NIFI-1645 refactored PutKafka - used newest API available in 0.8.* version - added PutKafka integration tests - Kafka module code coverage is at 85% NIFI-1645 polishing NIFI-1645 PR comments round 1 NIFI-1645 PR comments round 2 NIFI-1645 change to use async Kafka producer NIFI-1645 polishing NIFI-1645 polishing NIFI-1645 polishing NIFI-1645 changed from java.util.Scanner to custom StreamScanner NIFI-1645 polishing NIFI-1645 final polish --- .../nifi/processors/kafka/KafkaPublisher.java | 203 ++++ .../nifi/processors/kafka/Partitioners.java | 84 ++ .../nifi/processors/kafka/PutKafka.java | 1080 +++++------------ .../kafka/SplittableMessageContext.java | 114 ++ .../nifi/processors/kafka/StreamScanner.java | 93 ++ .../processors/kafka/KafkaPublisherTest.java | 205 ++++ .../nifi/processors/kafka/TestPutKafka.java | 549 +++------ .../src/test/resources/log4j.properties | 6 +- 8 files changed, 1172 insertions(+), 1162 deletions(-) create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java new file mode 100644 index 0000000000..e116978f2e --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.InputStream; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import kafka.producer.KeyedMessage; +import kafka.producer.Partitioner; + +/** + * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with + * sending content of {@link FlowFile}s to Kafka. + */ +public class KafkaPublisher implements AutoCloseable { + + private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class); + + private final KafkaProducer producer; + + private final Partitioner partitioner; + + private final long ackWaitTime; + + private ProcessorLog processLog; + + /** + * Creates an instance of this class as well as the instance of the + * corresponding Kafka {@link KafkaProducer} using provided Kafka + * configuration properties. + */ + KafkaPublisher(Properties kafkaProperties) { + kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + this.producer = new KafkaProducer(kafkaProperties); + this.ackWaitTime = Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2; + try { + if (kafkaProperties.containsKey("partitioner.class")){ + this.partitioner = (Partitioner) Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance(); + } else { + this.partitioner = null; + } + } catch (Exception e) { + throw new IllegalStateException("Failed to create partitioner", e); + } + } + + /** + * + */ + void setProcessLog(ProcessorLog processLog) { + this.processLog = processLog; + } + + /** + * Publishes messages to Kafka topic. It supports three publishing + * mechanisms. + *
    + *
  • Sending the entire content stream as a single Kafka message.
  • + *
  • Splitting the incoming content stream into chunks and sending + * individual chunks as separate Kafka messages.
  • + *
  • Splitting the incoming content stream into chunks and sending only + * the chunks that have failed previously @see + * {@link SplittableMessageContext#getFailedSegments()}.
  • + *
+ * This method assumes content stream affinity where it is expected that the + * content stream that represents the same Kafka message(s) will remain the + * same across possible retries. This is required specifically for cases + * where delimiter is used and a single content stream may represent + * multiple Kafka messages. The failed segment list will keep the index of + * of each content stream segment that had failed to be sent to Kafka, so + * upon retry only the failed segments are sent. + * + * @param messageContext + * instance of {@link SplittableMessageContext} which hold + * context information about the message to be sent + * @param contentStream + * instance of open {@link InputStream} carrying the content of + * the message(s) to be send to Kafka + * @param partitionKey + * the value of the partition key. Only relevant is user wishes + * to provide a custom partition key instead of relying on + * variety of provided {@link Partitioner}(s) + * @return The set containing the failed segment indexes for messages that + * failed to be sent to Kafka. + */ + BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey) { + List> sendFutures = new ArrayList<>(); + BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments(); + int segmentCounter = 0; + StreamScanner scanner = new StreamScanner(contentStream, messageContext.getDelimiterPattern()); + + while (scanner.hasNext()) { + byte[] content = scanner.next(); + if (content.length > 0){ + byte[] key = messageContext.getKeyBytes(); + String topicName = messageContext.getTopicName(); + if (partitionKey == null && key != null) { + partitionKey = this.getPartition(key, topicName); + } + if (prevFailedSegmentIndexes == null || prevFailedSegmentIndexes.get(segmentCounter)) { + ProducerRecord message = new ProducerRecord(topicName, partitionKey, key, content); + sendFutures.add(this.toKafka(message)); + } + segmentCounter++; + } + } + scanner.close(); + return this.processAcks(sendFutures); + } + + /** + * + */ + private BitSet processAcks(List> sendFutures) { + int segmentCounter = 0; + BitSet failedSegments = new BitSet(); + for (Future future : sendFutures) { + try { + future.get(this.ackWaitTime, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + failedSegments.set(segmentCounter); + Thread.currentThread().interrupt(); + logger.warn("Interrupted while waiting for acks from Kafka"); + if (this.processLog != null) { + this.processLog.warn("Interrupted while waiting for acks from Kafka"); + } + } catch (ExecutionException e) { + failedSegments.set(segmentCounter); + logger.error("Failed while waiting for acks from Kafka", e); + if (this.processLog != null) { + this.processLog.error("Failed while waiting for acks from Kafka", e); + } + } catch (TimeoutException e) { + failedSegments.set(segmentCounter); + logger.warn("Timed out while waiting for acks from Kafka"); + if (this.processLog != null) { + this.processLog.warn("Timed out while waiting for acks from Kafka"); + } + } + segmentCounter++; + } + return failedSegments; + } + + /** + * + */ + private int getPartition(Object key, String topicName) { + int partSize = this.producer.partitionsFor(topicName).size(); + return this.partitioner.partition(key, partSize); + } + + /** + * Closes {@link KafkaProducer} + */ + @Override + public void close() throws Exception { + this.producer.close(); + } + + /** + * Sends the provided {@link KeyedMessage} to Kafka async returning + * {@link Future} + */ + private Future toKafka(ProducerRecord message) { + if (logger.isDebugEnabled()) { + logger.debug("Publishing message to '" + message.topic() + "' topic."); + } + return this.producer.send(message); + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java new file mode 100644 index 0000000000..2a851a4327 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.util.Random; + +import kafka.producer.Partitioner; + +/** + * Collection of implementation of common Kafka {@link Partitioner}s. + */ +final public class Partitioners { + + private Partitioners() { + } + /** + * {@link Partitioner} that implements 'round-robin' mechanism which evenly + * distributes load between all available partitions. + */ + public static class RoundRobinPartitioner implements Partitioner { + private volatile int index; + + @Override + public int partition(Object key, int numberOfPartitions) { + int partitionIndex = this.next(numberOfPartitions); + return partitionIndex; + } + + private int next(int numberOfPartitions) { + if (index == numberOfPartitions) { + index = 0; + } + int indexToReturn = index++; + return indexToReturn; + } + } + + /** + * {@link Partitioner} that implements 'random' mechanism which randomly + * distributes the load between all available partitions. + */ + public static class RandomPartitioner implements Partitioner { + private final Random random; + + public RandomPartitioner() { + this.random = new Random(); + } + + @Override + public int partition(Object key, int numberOfPartitions) { + return this.random.nextInt(numberOfPartitions); + } + } + + /** + * {@link Partitioner} that implements 'key hash' mechanism which + * distributes the load between all available partitions based on hashing + * the value of the key. + */ + public static class HashPartitioner implements Partitioner { + + @Override + public int partition(Object key, int numberOfPartitions) { + if (key != null) { + return (key.hashCode() & Integer.MAX_VALUE) % numberOfPartitions; + } + return 0; + } + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index f91099eab2..4510038cd2 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,42 +20,23 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; -import org.apache.kafka.clients.producer.BufferExhaustedException; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -65,326 +46,419 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.ByteArrayOutputStream; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; -import org.apache.nifi.util.LongHolder; + +import kafka.producer.DefaultPartitioner; @InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) +@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" }) @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " - + "user-specified delimiter, such as a new-line.") -@TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed + + "user-specified delimiter, such as a new-line.") @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", - description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" + " overriden with warning message describing the override." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") -public class PutKafka extends AbstractSessionFactoryProcessor { +public class PutKafka extends AbstractProcessor { private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; + private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to" - + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); - public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" - + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" - + " but can result in data loss if a Kafka node crashes"); - public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" - + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" - + " in data loss."); - + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", + "FlowFile will be routed to" + + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", + "FlowFile will be routed" + + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + + " but can result in data loss if a Kafka node crashes"); + public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", + "FlowFile will be routed to success after" + + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + + " in data loss."); /** * AllowableValue for sending messages to Kafka without compression */ - public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic."); + public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", + "Compression will not be used for any topic."); /** * AllowableValue for sending messages to Kafka with GZIP compression */ - public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP"); + public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", + "Compress messages using GZIP"); /** * AllowableValue for sending messages to Kafka with Snappy compression */ - public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); + public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", + "Compress messages using Snappy"); static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", - "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary."); + "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, " + + "the next Partition to Partition 2, and so on, wrapping as necessary."); static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", - "Messages will be assigned to random partitions."); + "Messages will be assigned to random partitions."); static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", - "The property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition."); - + "The property will be used to determine the partition. All messages within the same FlowFile will be " + + "assigned to the same partition."); public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() - .name("Known Brokers") - .description("A comma-separated list of known Kafka Brokers in the format :") - .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) - .expressionLanguageSupported(false) - .build(); + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format :") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic of interest") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() - .name("Partition Strategy") - .description("Specifies how messages should be partitioned when sent to Kafka") - .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) - .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) - .required(true) - .build(); + .name("Partition Strategy") + .description("Specifies how messages should be partitioned when sent to Kafka") + .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) + .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) + .required(true) + .build(); public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() - .name("Partition") - .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. " - + "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .build(); + .name("Partition") + .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages " + + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, " + + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("Kafka Key") - .description("The Key to use for the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") - .required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka").required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile " - + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those " - + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' " - + "relationship.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, " + + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka " + + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred " + + "successfully while others are not, the messages will be split into individual FlowFiles, such that those " + + "messages that were successfully sent are routed to the 'success' relationship while other messages are " + + "sent to the 'failure' relationship.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data to buffer in memory before sending to Kafka") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("5 MB") - .build(); + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("5 MB") + .build(); static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() - .name("Max Record Size") - .description("The maximum size that any individual record can be.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .required(true) - .defaultValue("1 MB") - .build(); + .name("Max Record Size") + .description("The maximum size that any individual record can be.") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR).required(true) + .defaultValue("1 MB") + .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs").build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() - .name("Async Batch Size") - .displayName("Batch Size") - .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready" - + " to send or \"Queue Buffering Max Time\" is reached.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("200") - .build(); + .name("Async Batch Size").displayName("Batch Size") + .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready " + + "to send or \"Queue Buffering Max Time\" is reached. NOTE: This property will be ignored unless the 'Message Delimiter' " + + "property is specified.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("200") + .build(); public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() - .name("Queue Buffering Max Time") - .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" - + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" - + " throughput but adds message delivery latency due to the buffering.") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("5 secs") - .build(); + .name("Queue Buffering Max Time") + .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" + + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" + + " throughput but adds message delivery latency due to the buffering.") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("5 secs") + .build(); public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() - .name("Compression Codec") - .description("This parameter allows you to specify the compression codec for all" - + " data generated by this producer.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) - .defaultValue(COMPRESSION_CODEC_NONE.getValue()) - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") - .build(); - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); - - private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); - private final BlockingQueue completeBatches = new LinkedBlockingQueue<>(); - private final Set activeBatches = Collections.synchronizedSet(new HashSet()); - - private final ConcurrentMap partitionIndexMap = new ConcurrentHashMap<>(); - - private volatile Producer producer; - - private volatile ExecutorService executor; - private volatile long deadlockTimeout; - - @Override - protected List getSupportedPropertyDescriptors() { - final PropertyDescriptor clientName = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) + .name("Compression Codec") + .description("This parameter allows you to specify the compression codec for all" + + " data generated by this producer.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) + .defaultValue(COMPRESSION_CODEC_NONE.getValue()) .build(); - final List props = new ArrayList<>(); - props.add(SEED_BROKERS); - props.add(TOPIC); - props.add(PARTITION_STRATEGY); - props.add(PARTITION); - props.add(KEY); - props.add(DELIVERY_GUARANTEE); - props.add(MESSAGE_DELIMITER); - props.add(MAX_BUFFER_SIZE); - props.add(MAX_RECORD_SIZE); - props.add(TIMEOUT); - props.add(BATCH_NUM_MESSAGES); - props.add(QUEUE_BUFFERING_MAX); - props.add(COMPRESSION_CODEC); - props.add(clientName); - return props; + // Relationships + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); + + protected static final String ATTR_PROC_ID = "PROC_ID"; + + protected static final String ATTR_FAILED_SEGMENTS = "FS"; + + protected static final String ATTR_TOPIC = "TOPIC"; + + protected static final String ATTR_KEY = "KEY"; + + protected static final String ATTR_DELIMITER = "DELIMITER"; + + private volatile KafkaPublisher kafkaPublisher; + + private static final List propertyDescriptors; + + private static final Set relationships; + + static { + List _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.add(SEED_BROKERS); + _propertyDescriptors.add(TOPIC); + _propertyDescriptors.add(PARTITION_STRATEGY); + _propertyDescriptors.add(PARTITION); + _propertyDescriptors.add(KEY); + _propertyDescriptors.add(DELIVERY_GUARANTEE); + _propertyDescriptors.add(MESSAGE_DELIMITER); + _propertyDescriptors.add(MAX_BUFFER_SIZE); + _propertyDescriptors.add(MAX_RECORD_SIZE); + _propertyDescriptors.add(TIMEOUT); + _propertyDescriptors.add(BATCH_NUM_MESSAGES); + _propertyDescriptors.add(QUEUE_BUFFERING_MAX); + _propertyDescriptors.add(COMPRESSION_CODEC); + _propertyDescriptors.add(CLIENT_NAME); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + Set _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); } + /** + * + */ + @OnScheduled + public void createKafkaPublisher(ProcessContext context) { + this.kafkaPublisher = new KafkaPublisher(this.buildKafkaConfigProperties(context)); + this.kafkaPublisher.setProcessLog(this.getLogger()); + } + + /** + * + */ + @Override + public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile != null) { + final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session); + final Integer partitionKey = this.determinePartition(messageContext, context, flowFile); + final AtomicReference failedSegmentsRef = new AtomicReference(); + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(InputStream contentStream) throws IOException { + failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey)); + } + }); + + if (failedSegmentsRef.get().isEmpty()) { + session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName()); + flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session); + session.transfer(flowFile, REL_SUCCESS); + } else { + flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext)); + session.transfer(flowFile, REL_FAILURE); + } + + } else { + context.yield(); + } + } + + @OnStopped + public void cleanup() { + try { + this.kafkaPublisher.close(); + } catch (Exception e) { + getLogger().warn("Failed while closing KafkaPublisher", e); + } + } @Override public Set getRelationships() { - final Set relationships = new HashSet<>(1); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); return relationships; } + @Override + protected List getSupportedPropertyDescriptors() { + return propertyDescriptors; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") + .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) + .build(); + } + @Override protected Collection customValidate(final ValidationContext validationContext) { final List results = new ArrayList<>(); final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); - if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) { - results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation( - "The property must be set when configured to use the User-Defined Partitioning Strategy").build()); + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) + && !validationContext.getProperty(PARTITION).isSet()) { + results.add(new ValidationResult.Builder().subject("Partition").valid(false) + .explanation("The property must be set when configured to use the User-Defined Partitioning Strategy") + .build()); } - return results; } - protected Producer getProducer() { - return producer; + /** + * + */ + private FlowFile cleanUpFlowFileIfNecessary(FlowFile flowFile, ProcessSession session) { + if (flowFile.getAttribute(ATTR_FAILED_SEGMENTS) != null) { + flowFile = session.removeAttribute(flowFile, ATTR_FAILED_SEGMENTS); + flowFile = session.removeAttribute(flowFile, ATTR_KEY); + flowFile = session.removeAttribute(flowFile, ATTR_TOPIC); + flowFile = session.removeAttribute(flowFile, ATTR_DELIMITER); + flowFile = session.removeAttribute(flowFile, ATTR_PROC_ID); + } + return flowFile; } - @OnStopped - public void cleanup() { - final Producer producer = getProducer(); - if (producer != null) { - producer.close(); - } - - for (final FlowFileMessageBatch batch : activeBatches) { - batch.cancelOrComplete(); - } - if (this.executor != null) { - this.executor.shutdown(); - try { - if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { - this.executor.shutdownNow(); - getLogger().warn("Executor did not stop in 30 sec. Terminated."); - } - this.executor = null; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + /** + * + */ + private Integer determinePartition(SplittableMessageContext messageContext, ProcessContext context, FlowFile flowFile) { + String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + Integer partitionValue = null; + if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) { + String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); + if (pv != null){ + partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue()); } } + return partitionValue; } - @OnScheduled - public void createProducer(final ProcessContext context) { - this.deadlockTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; - if (this.executor == null || this.executor.isShutdown()) { - this.executor = Executors.newCachedThreadPool(); + /** + * + */ + private Map buildFailedFlowFileAttributes(BitSet failedSegments, SplittableMessageContext messageContext) { + Map attributes = new HashMap<>(); + attributes.put(ATTR_PROC_ID, this.getIdentifier()); + attributes.put(ATTR_FAILED_SEGMENTS, new String(failedSegments.toByteArray(), StandardCharsets.UTF_8)); + attributes.put(ATTR_TOPIC, messageContext.getTopicName()); + attributes.put(ATTR_KEY, messageContext.getKeyBytesAsString()); + attributes.put(ATTR_DELIMITER, messageContext.getDelimiterPattern()); + return attributes; + } + + /** + * + */ + private SplittableMessageContext buildMessageContext(FlowFile flowFile, ProcessContext context, ProcessSession session) { + String topicName; + byte[] key; + String delimiterPattern; + + String failedSegmentsString = flowFile.getAttribute(ATTR_FAILED_SEGMENTS); + if (flowFile.getAttribute(ATTR_PROC_ID) != null && flowFile.getAttribute(ATTR_PROC_ID).equals(this.getIdentifier()) && failedSegmentsString != null) { + topicName = flowFile.getAttribute(ATTR_TOPIC); + key = flowFile.getAttribute(ATTR_KEY).getBytes(); + delimiterPattern = flowFile.getAttribute(ATTR_DELIMITER); + } else { + failedSegmentsString = null; + topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + key = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); + delimiterPattern = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); } - producer = new KafkaProducer(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer()); + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, key, delimiterPattern); + if (failedSegmentsString != null) { + messageContext.setFailedSegmentsAsByteArray(failedSegmentsString.getBytes()); + } + return messageContext; } - protected int getActiveMessageBatchCount() { - return activeBatches.size(); - } - - protected int getCompleteMessageBatchCount() { - return completeBatches.size(); - } - - protected Properties createConfig(final ProcessContext context) { - final String brokers = context.getProperty(SEED_BROKERS).getValue(); - - final Properties properties = new Properties(); - properties.setProperty("bootstrap.servers", brokers); + /** + * + */ + private Properties buildKafkaConfigProperties(final ProcessContext context) { + Properties properties = new Properties(); + String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); + properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue()); properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); + properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue())); + properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue()); + if (context.getProperty(MESSAGE_DELIMITER).isSet()) { + properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); + } else { + properties.setProperty("batch.size", "1"); + } + properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); - - final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); - properties.setProperty("timeout.ms", timeout); - properties.setProperty("metadata.fetch.timeout.ms", timeout); - - properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); - properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); - - final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); - properties.setProperty("buffer.memory", String.valueOf(maxBufferSize)); - - final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); - properties.setProperty("compression.type", compressionCodec); - - final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); + Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); if (queueBufferingMillis != null) { properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis)); } + properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); + properties.setProperty("timeout.ms", timeout); + properties.setProperty("metadata.fetch.timeout.ms", timeout); - properties.setProperty("retries", "0"); - properties.setProperty("block.on.buffer.full", "false"); + String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); + String partitionerClass = null; + if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { + partitionerClass = Partitioners.RoundRobinPartitioner.class.getName(); + } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { + partitionerClass = DefaultPartitioner.class.getName(); + } + properties.setProperty("partitioner.class", partitionerClass); + // Set Dynamic Properties for (final Entry entry : context.getProperties().entrySet()) { PropertyDescriptor descriptor = entry.getKey(); if (descriptor.isDynamic()) { @@ -396,494 +470,6 @@ public class PutKafka extends AbstractSessionFactoryProcessor { properties.setProperty(descriptor.getName(), entry.getValue()); } } - return properties; } - - private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) { - final long unnormalizedIndex; - - final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); - if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { - AtomicLong partitionIndex = partitionIndexMap.get(topic); - if (partitionIndex == null) { - partitionIndex = new AtomicLong(0L); - final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex); - if (existing != null) { - partitionIndex = existing; - } - } - - unnormalizedIndex = partitionIndex.getAndIncrement(); - } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { - return null; - } else { - if (context.getProperty(PARTITION).isSet()) { - final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); - - if (NUMBER_PATTERN.matcher(partitionValue).matches()) { - // Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions. - unnormalizedIndex = Long.parseLong(partitionValue) - 1; - } else { - unnormalizedIndex = partitionValue.hashCode(); - } - } else { - return null; - } - } - - final Producer producer = getProducer(); - final List partitionInfos = producer.partitionsFor(topic); - final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size()); - return partitionInfos.get(partitionIdx).partition(); - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") - .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) - .build(); - } - - - @Override - public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { - FlowFileMessageBatch batch; - while ((batch = completeBatches.poll()) != null) { - batch.completeSession(); - } - - final ProcessSession session = sessionFactory.createSession(); - final FlowFile flowFile = session.get(); - if (flowFile != null){ - Future consumptionFuture = this.executor.submit(new Callable() { - @Override - public Void call() throws Exception { - doOnTrigger(context, session, flowFile); - return null; - } - }); - try { - consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - consumptionFuture.cancel(true); - Thread.currentThread().interrupt(); - getLogger().warn("Interrupted while sending messages", e); - } catch (ExecutionException e) { - throw new IllegalStateException(e); - } catch (TimeoutException e) { - consumptionFuture.cancel(true); - getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e); - } - } else { - context.yield(); - } - } - - private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException { - final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); - String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); - if (delimiter != null) { - delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - } - - final Producer producer = getProducer(); - - if (delimiter == null) { - // Send the entire FlowFile as a single message. - final byte[] value = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, value); - } - }); - - final Integer partition; - try { - partition = getPartition(context, flowFile, topic); - } catch (final Exception e) { - getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - session.commit(); - return; - } - - final ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value); - - final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); - messageBatch.setNumMessages(1); - activeBatches.add(messageBatch); - - try { - producer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - // record was successfully sent. - messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset()); - } else { - messageBatch.addFailedRange(0L, flowFile.getSize(), exception); - } - } - }); - } catch (final BufferExhaustedException bee) { - messageBatch.addFailedRange(0L, flowFile.getSize(), bee); - context.yield(); - return; - } - } else { - final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); - - // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see - // if it matches some pattern. We can use this to search for the delimiter as we read through - // the stream of bytes in the FlowFile - final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); - - final LongHolder messagesSent = new LongHolder(0L); - final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); - activeBatches.add(messageBatch); - - try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - byte[] data = null; // contents of a single message - - boolean streamFinished = false; - - int nextByte; - try (final InputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - - long messageStartOffset = in.getBytesConsumed(); - - // read until we're out of data. - while (!streamFinished) { - nextByte = in.read(); - - if (nextByte > -1) { - baos.write(nextByte); - } - - if (nextByte == -1) { - // we ran out of data. This message is complete. - data = baos.toByteArray(); - streamFinished = true; - } else if (buffer.addAndCompare((byte) nextByte)) { - // we matched our delimiter. This message is complete. We want all of the bytes from the - // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want - // the delimiter itself to be sent. - data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); - } - - if (data != null) { - final long messageEndOffset = in.getBytesConsumed(); - - // If the message has no data, ignore it. - if (data.length != 0) { - final Integer partition; - try { - partition = getPartition(context, flowFile, topic); - } catch (final Exception e) { - messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e); - getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); - continue; - } - - - final ProducerRecord producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data); - final long rangeStart = messageStartOffset; - - try { - producer.send(producerRecord, new Callback() { - @Override - public void onCompletion(final RecordMetadata metadata, final Exception exception) { - if (exception == null) { - // record was successfully sent. - messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset()); - } else { - messageBatch.addFailedRange(rangeStart, messageEndOffset, exception); - } - } - }); - - messagesSent.incrementAndGet(); - } catch (final BufferExhaustedException bee) { - // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range - messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee); - context.yield(); - return; - } - - } - - // reset BAOS so that we can start a new message. - baos.reset(); - data = null; - messageStartOffset = in.getBytesConsumed(); - } - } - } - } - }); - - messageBatch.setNumMessages(messagesSent.get()); - } - } - } - - - private static class Range { - private final long start; - private final long end; - private final Long kafkaOffset; - - public Range(final long start, final long end, final Long kafkaOffset) { - this.start = start; - this.end = end; - this.kafkaOffset = kafkaOffset; - } - - public long getStart() { - return start; - } - - public long getEnd() { - return end; - } - - public Long getKafkaOffset() { - return kafkaOffset; - } - - @Override - public String toString() { - return "Range[" + start + "-" + end + "]"; - } - } - - private class FlowFileMessageBatch { - private final ProcessSession session; - private final FlowFile flowFile; - private final String topic; - private final long startTime = System.nanoTime(); - - private final List successfulRanges = new ArrayList<>(); - private final List failedRanges = new ArrayList<>(); - - private Exception lastFailureReason; - private long numMessages = -1L; - private long completeTime = 0L; - private boolean canceled = false; - - public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) { - this.session = session; - this.flowFile = flowFile; - this.topic = topic; - } - - public synchronized void cancelOrComplete() { - if (isComplete()) { - completeSession(); - return; - } - - this.canceled = true; - - session.rollback(); - successfulRanges.clear(); - failedRanges.clear(); - } - - public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) { - if (canceled) { - return; - } - - successfulRanges.add(new Range(start, end, kafkaOffset)); - - if (isComplete()) { - activeBatches.remove(this); - completeBatches.add(this); - completeTime = System.nanoTime(); - } - } - - public synchronized void addFailedRange(final long start, final long end, final Exception e) { - if (canceled) { - return; - } - - failedRanges.add(new Range(start, end, null)); - lastFailureReason = e; - - if (isComplete()) { - activeBatches.remove(this); - completeBatches.add(this); - completeTime = System.nanoTime(); - } - } - - private boolean isComplete() { - return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages); - } - - public synchronized void setNumMessages(final long msgCount) { - this.numMessages = msgCount; - - if (isComplete()) { - activeBatches.remove(this); - completeBatches.add(this); - completeTime = System.nanoTime(); - } - } - - private Long getMin(final Long a, final Long b) { - if (a == null && b == null) { - return null; - } - - if (a == null) { - return b; - } - - if (b == null) { - return a; - } - - return Math.min(a, b); - } - - private Long getMax(final Long a, final Long b) { - if (a == null && b == null) { - return null; - } - - if (a == null) { - return b; - } - - if (b == null) { - return a; - } - - return Math.max(a, b); - } - - private void transferRanges(final List ranges, final Relationship relationship) { - Collections.sort(ranges, new Comparator() { - @Override - public int compare(final Range o1, final Range o2) { - return Long.compare(o1.getStart(), o2.getStart()); - } - }); - - for (int i = 0; i < ranges.size(); i++) { - Range range = ranges.get(i); - int count = 1; - Long smallestKafkaOffset = range.getKafkaOffset(); - Long largestKafkaOffset = range.getKafkaOffset(); - - while (i + 1 < ranges.size()) { - // Check if the next range in the List continues where this one left off. - final Range nextRange = ranges.get(i + 1); - - if (nextRange.getStart() == range.getEnd()) { - // We have two ranges in a row that are contiguous; combine them into a single Range. - range = new Range(range.getStart(), nextRange.getEnd(), null); - - smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset()); - largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset()); - count++; - i++; - } else { - break; - } - } - - // Create a FlowFile for this range. - FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart()); - if (relationship == REL_SUCCESS) { - session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); - session.transfer(child, relationship); - } else { - session.transfer(session.penalize(child), relationship); - } - } - } - - private String getTransitUri() { - final List partitions = getProducer().partitionsFor(topic); - if (partitions.isEmpty()) { - return "kafka://unknown-host" + "/topics/" + topic; - } - - final PartitionInfo info = partitions.get(0); - final Node leader = info.leader(); - final String host = leader.host(); - final int port = leader.port(); - - return "kafka://" + host + ":" + port + "/topics/" + topic; - } - - public synchronized void completeSession() { - if (canceled) { - return; - } - - if (successfulRanges.isEmpty() && failedRanges.isEmpty()) { - getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile}); - session.transfer(flowFile, REL_SUCCESS); - session.commit(); - return; - } - - if (successfulRanges.isEmpty()) { - getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason}); - session.transfer(session.penalize(flowFile), REL_FAILURE); - session.commit(); - return; - } - - if (failedRanges.isEmpty()) { - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime); - - if (successfulRanges.size() == 1) { - final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset(); - final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset)); - session.getProvenanceReporter().send(flowFile, getTransitUri(), msg); - } else { - long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); - long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); - - for (final Range range : successfulRanges) { - smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset()); - largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset()); - } - - session.getProvenanceReporter().send(flowFile, getTransitUri(), - "Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); - } - - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis}); - session.commit(); - return; - } - - // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka - // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success' - // and the failed messages to 'failure'. - transferRanges(successfulRanges, REL_SUCCESS); - transferRanges(failedRanges, REL_FAILURE); - session.remove(flowFile); - getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}", - new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason}); - session.commit(); - } - } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java new file mode 100644 index 0000000000..9967404483 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/SplittableMessageContext.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.util.BitSet; + +import org.apache.nifi.flowfile.FlowFile; + +/** + * Context object that serves as a bridge between the content of a FlowFile and + * Kafka message(s). It contains all necessary information to allow + * {@link KafkaPublisher} to determine how a each content of the + * {@link FlowFile} must be sent to Kafka. + */ +final class SplittableMessageContext { + private final String topicName; + + private final String delimiterPattern; + + private final byte[] keyBytes; + + private volatile BitSet failedSegments; + + /** + * @param topicName + * the name of the Kafka topic + * @param keyBytes + * the instance of byte[] representing the key. Can be null. + * @param delimiterPattern + * the string representing the delimiter regex pattern. Can be + * null. For cases where it is null the EOF pattern will be used + * - "(\\W)\\Z". + */ + SplittableMessageContext(String topicName, byte[] keyBytes, String delimiterPattern) { + this.topicName = topicName; + this.keyBytes = keyBytes; + this.delimiterPattern = delimiterPattern != null ? delimiterPattern : "(\\W)\\Z"; + } + + /** + * + */ + @Override + public String toString() { + return "topic: '" + topicName + "'; delimiter: '" + delimiterPattern + "'"; + } + + /** + * + */ + void setFailedSegments(int... failedSegments) { + this.failedSegments = new BitSet(); + for (int failedSegment : failedSegments) { + this.failedSegments.set(failedSegment); + } + } + + /** + * + */ + void setFailedSegmentsAsByteArray(byte[] failedSegments) { + this.failedSegments = BitSet.valueOf(failedSegments); + } + + /** + * Returns the list of integers representing the segments (chunks) of the + * delimited content stream that had failed to be sent to Kafka topic. + */ + BitSet getFailedSegments() { + return this.failedSegments; + } + + /** + * Returns the name of the Kafka topic + */ + String getTopicName() { + return this.topicName; + } + + /** + * Returns the value of the delimiter regex pattern. + */ + String getDelimiterPattern() { + return this.delimiterPattern; + } + + /** + * Returns the key bytes as String + */ + String getKeyBytesAsString() { + return new String(this.keyBytes); + } + + /** + * Returns the key bytes + */ + byte[] getKeyBytes() { + return this.keyBytes; + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java new file mode 100644 index 0000000000..e959fddf5a --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/StreamScanner.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.util.Arrays; + +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; + +/** + * + */ +class StreamScanner { + + private final InputStream is; + + private final byte[] delimiter; + + private final NonThreadSafeCircularBuffer buffer; + + private final ByteArrayOutputStream baos; + + private byte[] data; + + private boolean eos; + + /** + * + */ + StreamScanner(InputStream is, String delimiter) { + this.is = new BufferedInputStream(is); + this.delimiter = delimiter.getBytes(); + buffer = new NonThreadSafeCircularBuffer(this.delimiter); + baos = new ByteArrayOutputStream(); + } + + /** + * + */ + boolean hasNext() { + this.data = null; + if (!this.eos) { + try { + boolean keepReading = true; + while (keepReading) { + byte b = (byte) this.is.read(); + if (b > -1) { + baos.write(b); + if (buffer.addAndCompare(b)) { + this.data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiter.length); + keepReading = false; + } + } else { + this.data = baos.toByteArray(); + keepReading = false; + this.eos = true; + } + } + baos.reset(); + } catch (Exception e) { + throw new IllegalStateException("Failed while reading InputStream", e); + } + } + return this.data != null; + } + + /** + * + */ + byte[] next() { + return this.data; + } + + void close() { + this.baos.close(); + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java new file mode 100644 index 0000000000..92a63070e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/KafkaPublisherTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.ConsumerTimeoutException; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; + +public class KafkaPublisherTest { + + private static final String sampleData = "The true sign of intelligence is not knowledge but imagination.\n" + + "It's not that I'm so smart, it's just that I stay with problems longer.\n" + + "The only source of knowledge is experience.\n" + + "Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.\n"; + + private static final String sampleData2 = "foo|bar|baz"; + + private static EmbeddedKafka kafkaLocal; + + private static EmbeddedKafkaProducerHelper producerHelper; + + @BeforeClass + public static void bforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); + } + + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); + } + + String test = "Khalid El Bakraoui rented an apartment in Brussels that was raided last week and both are suspected of having ties to " + + "the terror attacks in Paris in November, the source said. While Belgian officials say both brothers were suicide bombers, a U.S. " + + "official briefed earlier on preliminary evidence from the investigation says authorities are looking at the possibility that one of " + + "the airport explosions may have been caused by a bomb inside a suitcase and the other was a suicide bombing. But identifying the brothers " + + "should help spring the investigation forward, says Cedric Leighton, a CNN military analyst and the former deputy director for the Joint Chiefs of Staff."; + + @Test + public void validateSuccessfulSendAsWhole() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsWhole"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, null); + + publisher.publish(messageContext, fis, null); + + fis.close(); + publisher.close(); + + ConsumerIterator iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + try { + iter.next(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulSendAsDelimited() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsDelimited"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n"); + + publisher.publish(messageContext, fis, null); + publisher.close(); + + ConsumerIterator iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulSendAsDelimited2() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData2.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulSendAsDelimited2"; + + Properties kafkaProperties = this.buildProducerProperties(); + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "|"); + + publisher.publish(messageContext, fis, null); + publisher.close(); + + ConsumerIterator iter = this.buildConsumer(topicName); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + assertNotNull(iter.next()); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + @Test + public void validateSuccessfulReSendOfFailedSegments() throws Exception { + InputStream fis = new ByteArrayInputStream(sampleData.getBytes(StandardCharsets.UTF_8)); + String topicName = "validateSuccessfulReSendOfFailedSegments"; + + Properties kafkaProperties = this.buildProducerProperties(); + + KafkaPublisher publisher = new KafkaPublisher(kafkaProperties); + + SplittableMessageContext messageContext = new SplittableMessageContext(topicName, null, "\n"); + messageContext.setFailedSegments(1, 3); + + publisher.publish(messageContext, fis, null); + publisher.close(); + + ConsumerIterator iter = this.buildConsumer(topicName); + String m1 = new String(iter.next().message()); + String m2 = new String(iter.next().message()); + assertEquals("It's not that I'm so smart, it's just that I stay with problems longer.", m1); + assertEquals("Only two things are infinite, the universe and human stupidity, and I'm not sure about the former.", m2); + try { + iter.next(); + fail(); + } catch (ConsumerTimeoutException e) { + // that's OK since this is the Kafka mechanism to unblock + } + } + + private Properties buildProducerProperties() { + Properties kafkaProperties = new Properties(); + kafkaProperties.setProperty("bootstrap.servers", "0.0.0.0:" + kafkaLocal.getKafkaPort()); + kafkaProperties.setProperty("serializer.class", "kafka.serializer.DefaultEncoder"); + kafkaProperties.setProperty("acks", "1"); + kafkaProperties.put("auto.create.topics.enable", "true"); + kafkaProperties.setProperty("partitioner.class", "org.apache.nifi.processors.kafka.Partitioners$RoundRobinPartitioner"); + kafkaProperties.setProperty("timeout.ms", "5000"); + return kafkaProperties; + } + + private ConsumerIterator buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "localhost:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "5000"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + List> streams = consumerMap.get(topic); + ConsumerIterator iter = streams.get(0).iterator(); + return iter; + } +} diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 2f5da5c068..3ed05498f7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -17,462 +17,189 @@ package org.apache.nifi.processors.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; +import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Future; +import java.util.Properties; -import org.apache.kafka.clients.producer.BufferExhaustedException; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.nifi.provenance.ProvenanceEventRecord; -import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.processors.kafka.test.EmbeddedKafka; +import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.junit.Assert; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; -import kafka.common.FailedToSendMessageException; +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; public class TestPutKafka { - @Test - public void testMultipleKeyValuePerFlowFile() { - final TestableProcessor proc = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + private static EmbeddedKafka kafkaLocal; - runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); - runner.run(2); // we have to run twice because the first iteration will result in data being added to a queue in the processor; the second onTrigger call will transfer FlowFiles. + private static EmbeddedKafkaProducerHelper producerHelper; + + @BeforeClass + public static void bforeClass() { + kafkaLocal = new EmbeddedKafka(); + kafkaLocal.start(); + producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal); + } + + @AfterClass + public static void afterClass() throws Exception { + producerHelper.close(); + kafkaLocal.stop(); + } + + @Test + public void testDelimitedMessagesWithKey() { + String topicName = "testDelimitedMessagesWithKey"; + PutKafka putKafka = new PutKafka(); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); + + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5".getBytes()); + runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + ConsumerIterator consumer = this.buildConsumer(topicName); + assertEquals("Hello World", new String(consumer.next().message())); + assertEquals("Goodbye", new String(consumer.next().message())); + assertEquals("1", new String(consumer.next().message())); + assertEquals("2", new String(consumer.next().message())); + assertEquals("3", new String(consumer.next().message())); + assertEquals("4", new String(consumer.next().message())); + assertEquals("5", new String(consumer.next().message())); - final List> messages = ((MockProducer) proc.getProducer()).getMessages(); - assertEquals(11, messages.size()); - - assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0).value())); - assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1).value())); - - for (int i = 1; i <= 9; i++) { - assertTrue(Arrays.equals(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messages.get(i + 1).value())); - } + runner.shutdown(); } @Test - public void testWithImmediateFailure() { - final TestableProcessor proc = new TestableProcessor(0); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); + @Ignore + public void testWithFailureAndPartialResend() throws Exception { + String topicName = "testWithImmediateFailure"; + PutKafka putKafka = new PutKafka(); + final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.SEED_BROKERS, "0.0.0.0:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; + final String text = "Hello World\nGoodbye\n1\n2"; runner.enqueue(text.getBytes()); - runner.run(2); + afterClass(); // kill Kafka right before send to ensure producer fails + runner.run(1, false); runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); - final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - mff.assertContentEquals(text); - } + MockFlowFile ff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + String failedSegmentsStr = ff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS); + BitSet fs = BitSet.valueOf(failedSegmentsStr.getBytes()); + assertTrue(fs.get(0)); + assertTrue(fs.get(1)); + assertTrue(fs.get(2)); + assertTrue(fs.get(3)); + String delimiter = ff.getAttribute(PutKafka.ATTR_DELIMITER); + assertEquals("\n", delimiter); + String key = ff.getAttribute(PutKafka.ATTR_KEY); + assertEquals("key1", key); + String topic = ff.getAttribute(PutKafka.ATTR_TOPIC); + assertEquals(topicName, topic); - @Test - public void testPartialFailure() { - final TestableProcessor proc = new TestableProcessor(2); // fail after sending 2 messages. - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); + bforeClass(); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + Map attr = new HashMap<>(ff.getAttributes()); + /* + * So here we are emulating partial success. Basically even though all 4 + * messages failed to be sent by changing the ATTR_FAILED_SEGMENTS value + * we essentially saying that only two failed and need to be resent. + */ + BitSet _fs = new BitSet(); + _fs.set(1); + _fs.set(3); + attr.put(PutKafka.ATTR_FAILED_SEGMENTS, new String(_fs.toByteArray(), StandardCharsets.UTF_8)); + ff.putAttributes(attr); + runner.enqueue(ff); + runner.run(1, false); + MockFlowFile sff = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); + assertNull(sff.getAttribute(PutKafka.ATTR_FAILED_SEGMENTS)); + assertNull(sff.getAttribute(PutKafka.ATTR_TOPIC)); + assertNull(sff.getAttribute(PutKafka.ATTR_KEY)); + assertNull(sff.getAttribute(PutKafka.ATTR_DELIMITER)); - final byte[] bytes = "1\n2\n3\n4".getBytes(); - runner.enqueue(bytes); - runner.run(2); + ConsumerIterator consumer = this.buildConsumer(topicName); - runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); - runner.assertTransferCount(PutKafka.REL_FAILURE, 1); - - final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); - successFF.assertContentEquals("1\n2\n"); - - final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - failureFF.assertContentEquals("3\n4"); - } - - @Test - public void testPartialFailureWithSuccessBeforeAndAfter() { - final TestableProcessor proc = new TestableProcessor(2, 4); // fail after sending 2 messages, then stop failing after 4 - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); - - final byte[] bytes = "1\n2\n3\n4\n5\n6".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - runner.assertTransferCount(PutKafka.REL_SUCCESS, 2); - runner.assertTransferCount(PutKafka.REL_FAILURE, 1); - - final List success = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); - for (final MockFlowFile successFF : success) { - if ('1' == successFF.toByteArray()[0]) { - successFF.assertContentEquals("1\n2\n"); - } else if ('5' == successFF.toByteArray()[0]) { - successFF.assertContentEquals("5\n6"); - } else { - Assert.fail("Wrong content for FlowFile; contained " + new String(successFF.toByteArray())); - } + assertEquals("Goodbye", new String(consumer.next().message())); + assertEquals("2", new String(consumer.next().message())); + try { + consumer.next(); + fail(); + } catch (Exception e) { + // ignore } - - final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); - failureFF.assertContentEquals("3\n4\n"); } - @Test public void testWithEmptyMessages() { - final TestableProcessor proc = new TestableProcessor(); - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); + String topicName = "testWithEmptyMessages"; + PutKafka putKafka = new PutKafka(); + final TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PutKafka.TOPIC, topicName); runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.CLIENT_NAME, "foo"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort()); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); - runner.run(2); + runner.run(1); runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - final List> msgs = ((MockProducer) proc.getProducer()).getMessages(); - assertEquals(4, msgs.size()); - - for (int i = 1; i <= 4; i++) { - assertTrue(Arrays.equals(String.valueOf(i).getBytes(), msgs.get(i - 1).value())); + ConsumerIterator consumer = this.buildConsumer(topicName); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + assertNotNull(consumer.next()); + try { + consumer.next(); + fail(); + } catch (Exception e) { + // ignore } } - @Test - public void testProvenanceReporterMessagesCount() { - final TestableProcessor processor = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(processor); - - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - final List events = runner.getProvenanceEvents(); - assertEquals(1, events.size()); - final ProvenanceEventRecord event = events.get(0); - assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); - assertTrue(event.getDetails().startsWith("Sent 4 messages")); - } - - @Test - public void testProvenanceReporterWithoutDelimiterMessagesCount() { - final TestableProcessor processor = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(processor); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - - final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); - runner.enqueue(bytes); - runner.run(2); - - final List events = runner.getProvenanceEvents(); - assertEquals(1, events.size()); - final ProvenanceEventRecord event = events.get(0); - assertEquals(ProvenanceEventType.SEND, event.getEventType()); - assertEquals("kafka://localhost:1111/topics/topic1", event.getTransitUri()); - } - - @Test - public void testRoundRobinAcrossMultipleMessages() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); - - runner.enqueue("hello".getBytes()); - runner.enqueue("there".getBytes()); - runner.enqueue("how are you".getBytes()); - runner.enqueue("today".getBytes()); - - runner.run(5); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); - - final List> records = ((MockProducer) proc.getProducer()).getMessages(); - for (int i = 0; i < 3; i++) { - assertEquals(i + 1, records.get(i).partition().intValue()); - } - - assertEquals(1, records.get(3).partition().intValue()); - } - - @Test - public void testRoundRobinAcrossMultipleMessagesInSameFlowFile() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.ROUND_ROBIN_PARTITIONING); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - - runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes()); - - runner.run(2); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - final List> records = ((MockProducer) proc.getProducer()).getMessages(); - for (int i = 0; i < 3; i++) { - assertEquals(i + 1, records.get(i).partition().intValue()); - } - - assertEquals(1, records.get(3).partition().intValue()); - } - - - @Test - public void testUserDefinedPartition() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); - runner.setProperty(PutKafka.PARTITION, "${part}"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - - final Map attrs = new HashMap<>(); - attrs.put("part", "3"); - runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - - runner.run(2); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - final List> records = ((MockProducer) proc.getProducer()).getMessages(); - for (int i = 0; i < 4; i++) { - assertEquals(3, records.get(i).partition().intValue()); - } - } - - - - @Test - public void testUserDefinedPartitionWithInvalidValue() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING); - runner.setProperty(PutKafka.PARTITION, "${part}"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - - final Map attrs = new HashMap<>(); - attrs.put("part", "bogus"); - runner.enqueue("hello\nthere\nhow are you\ntoday".getBytes(), attrs); - - runner.run(2); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - - final List> records = ((MockProducer) proc.getProducer()).getMessages(); - // should all be the same partition, regardless of what partition it is. - final int partition = records.get(0).partition().intValue(); - - for (int i = 0; i < 4; i++) { - assertEquals(partition, records.get(i).partition().intValue()); - } - } - - - @Test - public void testFullBuffer() { - final TestableProcessor proc = new TestableProcessor(); - - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(PutKafka.TOPIC, "topic1"); - runner.setProperty(PutKafka.KEY, "key1"); - runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); - runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n"); - runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "5 B"); - proc.setMaxQueueSize(10L); // will take 4 bytes for key and 1 byte for value. - - runner.enqueue("1\n2\n3\n4\n".getBytes()); - runner.run(2); - - runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); - runner.assertTransferCount(PutKafka.REL_FAILURE, 1); - - runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0).assertContentEquals("1\n2\n"); - runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0).assertContentEquals("3\n4\n"); - } - - - /** - * Used to override the {@link #getProducer()} method so that we can enforce that our MockProducer is used - */ - private static class TestableProcessor extends PutKafka { - private final MockProducer producer; - - public TestableProcessor() { - this(null); - } - - public TestableProcessor(final Integer failAfter) { - this(failAfter, null); - } - - public TestableProcessor(final Integer failAfter, final Integer stopFailingAfter) { - producer = new MockProducer(); - producer.setFailAfter(failAfter); - producer.setStopFailingAfter(stopFailingAfter); - } - - @Override - protected Producer getProducer() { - return producer; - } - - public void setMaxQueueSize(final long bytes) { - producer.setMaxQueueSize(bytes); - } - } - - - /** - * We have our own Mock Producer, which is very similar to the Kafka-supplied one. However, with the Kafka-supplied - * Producer, we don't have the ability to tell it to fail after X number of messages; rather, we can only tell it - * to fail on the next message. Since we are sending multiple messages in a single onTrigger call for the Processor, - * this doesn't allow us to test failure conditions adequately. - */ - private static class MockProducer implements Producer { - - private int sendCount = 0; - private Integer failAfter; - private Integer stopFailingAfter; - private long queueSize = 0L; - private long maxQueueSize = Long.MAX_VALUE; - - private final List> messages = new ArrayList<>(); - - public MockProducer() { - } - - public void setMaxQueueSize(final long bytes) { - this.maxQueueSize = bytes; - } - - public List> getMessages() { - return messages; - } - - public void setFailAfter(final Integer successCount) { - failAfter = successCount; - } - - public void setStopFailingAfter(final Integer stopFailingAfter) { - this.stopFailingAfter = stopFailingAfter; - } - - @Override - public Future send(final ProducerRecord record) { - return send(record, null); - } - - @Override - public Future send(final ProducerRecord record, final Callback callback) { - sendCount++; - - final ByteArraySerializer serializer = new ByteArraySerializer(); - final int keyBytes = serializer.serialize(record.topic(), record.key()).length; - final int valueBytes = serializer.serialize(record.topic(), record.value()).length; - if (maxQueueSize - queueSize < keyBytes + valueBytes) { - throw new BufferExhaustedException("Queue size is " + queueSize + " but serialized message is " + (keyBytes + valueBytes)); - } - - queueSize += keyBytes + valueBytes; - - if (failAfter != null && sendCount > failAfter && ((stopFailingAfter == null) || (sendCount < stopFailingAfter + 1))) { - final Exception e = new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); - callback.onCompletion(null, e); - } else { - messages.add(record); - final RecordMetadata meta = new RecordMetadata(new TopicPartition(record.topic(), record.partition() == null ? 1 : record.partition()), 0L, 0L); - callback.onCompletion(meta, null); - } - - // we don't actually look at the Future in the processor, so we can just return null - return null; - } - - @Override - public List partitionsFor(String topic) { - final Node leader = new Node(1, "localhost", 1111); - final Node node2 = new Node(2, "localhost-2", 2222); - final Node node3 = new Node(3, "localhost-3", 3333); - - final PartitionInfo partInfo1 = new PartitionInfo(topic, 1, leader, new Node[] {node2, node3}, new Node[0]); - final PartitionInfo partInfo2 = new PartitionInfo(topic, 2, leader, new Node[] {node2, node3}, new Node[0]); - final PartitionInfo partInfo3 = new PartitionInfo(topic, 3, leader, new Node[] {node2, node3}, new Node[0]); - - final List infos = new ArrayList<>(3); - infos.add(partInfo1); - infos.add(partInfo2); - infos.add(partInfo3); - return infos; - } - - @Override - public Map metrics() { - return Collections.emptyMap(); - } - - @Override - public void close() { - } + private ConsumerIterator buildConsumer(String topic) { + Properties props = new Properties(); + props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort()); + props.put("group.id", "test"); + props.put("consumer.timeout.ms", "5000"); + props.put("auto.offset.reset", "smallest"); + ConsumerConfig consumerConfig = new ConsumerConfig(props); + ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig); + Map topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, 1); + Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + List> streams = consumerMap.get(topic); + ConsumerIterator iter = streams.get(0).iterator(); + return iter; } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties index 35778d8baa..8e37bb9c12 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/resources/log4j.properties @@ -12,12 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -log4j.rootCategory=WARN, stdout +log4j.rootCategory=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n -log4j.category.org.apache.nifi.processors.kafka=INFO -log4j.category.kafka=ERROR -#log4j.category.org.apache.nifi.startup=INFO +log4j.category.org.apache.nifi.processors.kafka=DEBUG