diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 63a816e75a..1b63a465cf 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -61,97 +61,101 @@ import org.apache.nifi.processor.util.StandardValidators; @SupportsBatching @CapabilityDescription("Fetches messages from Apache Kafka") @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) -@WritesAttributes({ @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), - @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), - @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), - @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1") }) +@WritesAttributes({ + @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), + @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" + + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), + @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), + @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")}) public class GetKafka extends AbstractProcessor { + public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() - .name("ZooKeeper Connection String") - .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of : combinations. For example, host1:2181,host2:2181,host3:2188") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("ZooKeeper Connection String") + .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of :" + + " combinations. For example, host1:2181,host2:2181,host3:2188") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic to pull messages from") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Topic Name") + .description("The Kafka Topic to pull messages from") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() - .name("Zookeeper Commit Frequency") - .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("60 secs") - .build(); + .name("Zookeeper Commit Frequency") + .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will" + + " result in better overall performance but can result in more data duplication if a NiFi node is lost") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("60 secs") + .build(); public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() - .name("ZooKeeper Communications Timeout") - .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); + .name("ZooKeeper Communications Timeout") + .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() - .name("Kafka Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); + .name("Kafka Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " - + "concatenated together with the string placed between the content of each message. " - + "If the messages from Kafka should not be concatenated together, leave this value at 1.") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1") - .build(); + .name("Batch Size") + .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " + + "concatenated together with the string placed between the content of each message. " + + "If the messages from Kafka should not be concatenated together, leave this value at 1.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .build(); public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() - .name("Message Demarcator") - .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the " - + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " - + "this value will be placed in between them.") - .required(true) - .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string - .expressionLanguageSupported(false) - .defaultValue("\\n") - .build(); + .name("Message Demarcator") + .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the " + + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " + + "this value will be placed in between them.") + .required(true) + .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string + .expressionLanguageSupported(false) + .defaultValue("\\n") + .build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles that are created are routed to this relationship") - .build(); + .name("success") + .description("All FlowFiles that are created are routed to this relationship") + .build(); - private final BlockingQueue> streamIterators = new LinkedBlockingQueue<>(); private volatile ConsumerConnector consumer; final Lock interruptionLock = new ReentrantLock(); // guarded by interruptionLock private final Set interruptableThreads = new HashSet<>(); - + @Override protected List getSupportedPropertyDescriptors() { - final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); - + final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + final List props = new ArrayList<>(); props.add(ZOOKEEPER_CONNECTION_STRING); props.add(TOPIC); @@ -163,174 +167,174 @@ public class GetKafka extends AbstractProcessor { props.add(ZOOKEEPER_TIMEOUT); return props; } - + @Override public Set getRelationships() { final Set relationships = new HashSet<>(1); relationships.add(REL_SUCCESS); return relationships; } - + @OnScheduled public void createConsumers(final ProcessContext context) { - final String topic = context.getProperty(TOPIC).getValue(); - - final Map topicCountMap = new HashMap<>(1); - topicCountMap.put(topic, context.getMaxConcurrentTasks()); - - final Properties props = new Properties(); - props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); - props.setProperty("group.id", getIdentifier()); - props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); - props.setProperty("auto.commit.enable", "true"); // just be explicit - props.setProperty("auto.offset.reset", "smallest"); - - final ConsumerConfig consumerConfig = new ConsumerConfig(props); - consumer = Consumer.createJavaConsumerConnector(consumerConfig); - - final Map>> consumerMap = consumer.createMessageStreams(topicCountMap); - final List> streams = consumerMap.get(topic); - - this.streamIterators.clear(); - - for ( final KafkaStream stream : streams ) { - streamIterators.add(stream.iterator()); - } + final String topic = context.getProperty(TOPIC).getValue(); + + final Map topicCountMap = new HashMap<>(1); + topicCountMap.put(topic, context.getMaxConcurrentTasks()); + + final Properties props = new Properties(); + props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); + props.setProperty("group.id", getIdentifier()); + props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); + props.setProperty("auto.commit.enable", "true"); // just be explicit + props.setProperty("auto.offset.reset", "smallest"); + + final ConsumerConfig consumerConfig = new ConsumerConfig(props); + consumer = Consumer.createJavaConsumerConnector(consumerConfig); + + final Map>> consumerMap = consumer.createMessageStreams(topicCountMap); + final List> streams = consumerMap.get(topic); + + this.streamIterators.clear(); + + for (final KafkaStream stream : streams) { + streamIterators.add(stream.iterator()); + } } - + @OnStopped public void shutdownConsumer() { - if ( consumer != null ) { - try { - consumer.commitOffsets(); - } finally { - consumer.shutdown(); - } - } + if (consumer != null) { + try { + consumer.commitOffsets(); + } finally { + consumer.shutdown(); + } + } } - + @OnUnscheduled public void interruptIterators() { - // Kafka doesn't provide a non-blocking API for pulling messages. We can, however, - // interrupt the Threads. We do this when the Processor is stopped so that we have the - // ability to shutdown the Processor. - interruptionLock.lock(); - try { - for ( final Thread t : interruptableThreads ) { - t.interrupt(); - } - - interruptableThreads.clear(); - } finally { - interruptionLock.unlock(); - } + // Kafka doesn't provide a non-blocking API for pulling messages. We can, however, + // interrupt the Threads. We do this when the Processor is stopped so that we have the + // ability to shutdown the Processor. + interruptionLock.lock(); + try { + for (final Thread t : interruptableThreads) { + t.interrupt(); + } + + interruptableThreads.clear(); + } finally { + interruptionLock.unlock(); + } } - + protected ConsumerIterator getStreamIterator() { return streamIterators.poll(); } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - ConsumerIterator iterator = getStreamIterator(); - if ( iterator == null ) { - return; - } - - final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); - final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); - final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); - final String topic = context.getProperty(TOPIC).getValue(); - - FlowFile flowFile = null; - try { - // add the current thread to the Set of those to be interrupted if processor stopped. - interruptionLock.lock(); - try { - interruptableThreads.add(Thread.currentThread()); - } finally { - interruptionLock.unlock(); - } - - final long start = System.nanoTime(); - flowFile = session.create(); - - final Map attributes = new HashMap<>(); + ConsumerIterator iterator = getStreamIterator(); + if (iterator == null) { + return; + } + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + final String topic = context.getProperty(TOPIC).getValue(); + + FlowFile flowFile = null; + try { + // add the current thread to the Set of those to be interrupted if processor stopped. + interruptionLock.lock(); + try { + interruptableThreads.add(Thread.currentThread()); + } finally { + interruptionLock.unlock(); + } + + final long start = System.nanoTime(); + flowFile = session.create(); + + final Map attributes = new HashMap<>(); attributes.put("kafka.topic", topic); int numMessages = 0; - for (int msgCount = 0; msgCount < batchSize; msgCount++) { - // if the processor is stopped, iterator.hasNext() will throw an Exception. - // In this case, we just break out of the loop. - try { - if ( !iterator.hasNext() ) { - break; - } - } catch (final Exception e) { - break; - } - - final MessageAndMetadata mam = iterator.next(); - if ( mam == null ) { - return; - } - - final byte[] key = mam.key(); - - if ( batchSize == 1 ) { - // the kafka.key, kafka.offset, and kafka.partition attributes are added only - // for a batch size of 1. - if ( key != null ) { - attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); - } - - attributes.put("kafka.offset", String.valueOf(mam.offset())); - attributes.put("kafka.partition", String.valueOf(mam.partition())); - } - - // add the message to the FlowFile's contents - final boolean firstMessage = (msgCount == 0); - flowFile = session.append(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - if ( !firstMessage ) { - out.write(demarcatorBytes); - } - out.write(mam.message()); - } - }); - numMessages++; - } - - // If we received no messages, remove the FlowFile. Otherwise, send to success. - if ( flowFile.getSize() == 0L ) { - session.remove(flowFile); - } else { - flowFile = session.putAllAttributes(flowFile, attributes); - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis); - getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis}); - session.transfer(flowFile, REL_SUCCESS); - } - } catch (final Exception e) { - getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e}); - if ( flowFile != null ) { - session.remove(flowFile); - } - } finally { - // Remove the current thread from the Set of Threads to interrupt. - interruptionLock.lock(); - try { - interruptableThreads.remove(Thread.currentThread()); - } finally { - interruptionLock.unlock(); - } - - // Add the iterator back to the queue - if ( iterator != null ) { - streamIterators.offer(iterator); - } - } + for (int msgCount = 0; msgCount < batchSize; msgCount++) { + // if the processor is stopped, iterator.hasNext() will throw an Exception. + // In this case, we just break out of the loop. + try { + if (!iterator.hasNext()) { + break; + } + } catch (final Exception e) { + break; + } + + final MessageAndMetadata mam = iterator.next(); + if (mam == null) { + return; + } + + final byte[] key = mam.key(); + + if (batchSize == 1) { + // the kafka.key, kafka.offset, and kafka.partition attributes are added only + // for a batch size of 1. + if (key != null) { + attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); + } + + attributes.put("kafka.offset", String.valueOf(mam.offset())); + attributes.put("kafka.partition", String.valueOf(mam.partition())); + } + + // add the message to the FlowFile's contents + final boolean firstMessage = (msgCount == 0); + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if (!firstMessage) { + out.write(demarcatorBytes); + } + out.write(mam.message()); + } + }); + numMessages++; + } + + // If we received no messages, remove the FlowFile. Otherwise, send to success. + if (flowFile.getSize() == 0L) { + session.remove(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis); + getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, numMessages, millis}); + session.transfer(flowFile, REL_SUCCESS); + } + } catch (final Exception e) { + getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e}); + if (flowFile != null) { + session.remove(flowFile); + } + } finally { + // Remove the current thread from the Set of Threads to interrupt. + interruptionLock.lock(); + try { + interruptableThreads.remove(Thread.currentThread()); + } finally { + interruptionLock.unlock(); + } + + // Add the iterator back to the queue + if (iterator != null) { + streamIterators.offer(iterator); + } + } } - + } diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index e0b7588c41..44b65849af 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -61,95 +61,100 @@ import scala.actors.threadpool.Arrays; @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka") public class PutKafka extends AbstractProcessor { + private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - - public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); - public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than but can result in data loss if a Kafka node crashes"); - public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss."); - - public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() - .name("Known Brokers") - .description("A comma-separated list of known Kafka Brokers in the format :") - .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) - .expressionLanguageSupported(false) - .build(); - public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() - .name("Topic Name") - .description("The Kafka Topic of interest") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("Kafka Key") - .description("The Key to use for the Message") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() - .name("Delivery Guarantee") - .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") - .required(true) - .expressionLanguageSupported(false) - .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) - .defaultValue(DELIVERY_BEST_EFFORT.getValue()) - .build(); - public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() - .name("Message Delimiter") - .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " - + "If not specified, the entire content of the FlowFile will be used as a single message. " - + "If specified, the contents of the FlowFile will be split on this delimiter and each section " - + "sent as a separate Kafka message.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .build(); - public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Buffer Size") - .description("The maximum amount of data to buffer in memory before sending to Kafka") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1 MB") - .build(); - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("30 secs") - .build(); - public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() - .name("Client Name") - .description("Client Name to use when communicating with Kafka") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(false) - .build(); - + public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to" + + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); + public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" + + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" + + " but can result in data loss if a Kafka node crashes"); + public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" + + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" + + " in data loss."); + + public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() + .name("Known Brokers") + .description("A comma-separated list of known Kafka Brokers in the format :") + .required(true) + .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .expressionLanguageSupported(false) + .build(); + public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() + .name("Topic Name") + .description("The Kafka Topic of interest") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Kafka Key") + .description("The Key to use for the Message") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() + .name("Delivery Guarantee") + .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") + .required(true) + .expressionLanguageSupported(false) + .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) + .defaultValue(DELIVERY_BEST_EFFORT.getValue()) + .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1 MB") + .build(); + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("30 secs") + .build(); + public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() + .name("Client Name") + .description("Client Name to use when communicating with Kafka") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") - .build(); + .name("success") + .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") - .build(); + .name("failure") + .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") + .build(); private final BlockingQueue> producers = new LinkedBlockingQueue<>(); - + @Override protected List getSupportedPropertyDescriptors() { - final PropertyDescriptor clientName = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(CLIENT_NAME) - .defaultValue("NiFi-" + getIdentifier()) - .build(); - + final PropertyDescriptor clientName = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(CLIENT_NAME) + .defaultValue("NiFi-" + getIdentifier()) + .build(); + final List props = new ArrayList<>(); props.add(SEED_BROKERS); props.add(TOPIC); @@ -161,7 +166,7 @@ public class PutKafka extends AbstractProcessor { props.add(clientName); return props; } - + @Override public Set getRelationships() { final Set relationships = new HashSet<>(1); @@ -169,17 +174,16 @@ public class PutKafka extends AbstractProcessor { relationships.add(REL_FAILURE); return relationships; } - - + @OnStopped public void closeProducers() { - Producer producer; - - while ((producer = producers.poll()) != null) { - producer.close(); - } + Producer producer; + + while ((producer = producers.poll()) != null) { + producer.close(); + } } - + protected ProducerConfig createConfig(final ProcessContext context) { final String brokers = context.getProperty(SEED_BROKERS).getValue(); @@ -188,76 +192,76 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue())); - + properties.setProperty("message.send.max.retries", "1"); properties.setProperty("producer.type", "sync"); - + return new ProducerConfig(properties); } - + protected Producer createProducer(final ProcessContext context) { - return new Producer<>(createConfig(context)); + return new Producer<>(createConfig(context)); } - + private Producer borrowProducer(final ProcessContext context) { - Producer producer = producers.poll(); - return producer == null ? createProducer(context) : producer; + Producer producer = producers.poll(); + return producer == null ? createProducer(context) : producer; } - + private void returnProducer(final Producer producer) { - producers.offer(producer); + producers.offer(producer); } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - final long start = System.nanoTime(); + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long start = System.nanoTime(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); - if ( delimiter != null ) { + if (delimiter != null) { delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); } - + final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); final Producer producer = borrowProducer(context); - - if ( delimiter == null ) { + + if (delimiter == null) { // Send the entire FlowFile as a single message. final byte[] value = new byte[(int) flowFile.getSize()]; session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, value); - } + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, value); + } }); - + boolean error = false; try { final KeyedMessage message; - if ( key == null ) { + if (key == null) { message = new KeyedMessage<>(topic, value); } else { message = new KeyedMessage<>(topic, keyBytes, value); } - + producer.send(message); final long nanos = System.nanoTime() - start; - + session.getProvenanceReporter().send(flowFile, "kafka://" + topic); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); } catch (final Exception e) { - getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e}); + getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[]{flowFile, e}); session.transfer(flowFile, REL_FAILURE); error = true; } finally { - if ( error ) { + if (error) { producer.close(); } else { returnProducer(producer); @@ -265,53 +269,53 @@ public class PutKafka extends AbstractProcessor { } } else { final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); - + // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see // if it matches some pattern. We can use this to search for the delimiter as we read through // the stream of bytes in the FlowFile final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); - + boolean error = false; final LongHolder lastMessageOffset = new LongHolder(0L); final LongHolder messagesSent = new LongHolder(0L); - + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { session.read(flowFile, new InputStreamCallback() { @Override public void process(final InputStream rawIn) throws IOException { byte[] data = null; // contents of a single message - + boolean streamFinished = false; - + final List> messages = new ArrayList<>(); // batch to send long messageBytes = 0L; // size of messages in the 'messages' list - + int nextByte; try (final InputStream bufferedIn = new BufferedInputStream(rawIn); - final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { - + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + // read until we're out of data. while (!streamFinished) { nextByte = in.read(); - if ( nextByte > -1 ) { + if (nextByte > -1) { baos.write(nextByte); } - + if (nextByte == -1) { // we ran out of data. This message is complete. data = baos.toByteArray(); streamFinished = true; - } else if ( buffer.addAndCompare((byte) nextByte) ) { + } else if (buffer.addAndCompare((byte) nextByte)) { // we matched our delimiter. This message is complete. We want all of the bytes from the // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want // the delimiter itself to be sent. data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); } - - if ( data != null ) { + + if (data != null) { // If the message has no data, ignore it. - if ( data.length != 0 ) { + if (data.length != 0) { // either we ran out of data or we reached the end of the message. // Either way, create the message because it's ready to send. final KeyedMessage message; @@ -361,7 +365,7 @@ public class PutKafka extends AbstractProcessor { } // If there are messages left, send them - if ( !messages.isEmpty() ) { + if (!messages.isEmpty()) { try { messagesSent.addAndGet(messages.size()); // add count of messages producer.send(messages); @@ -372,44 +376,45 @@ public class PutKafka extends AbstractProcessor { } } }); - + final long nanos = System.nanoTime() - start; session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages"); session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[]{messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); } catch (final ProcessException pe) { error = true; - + // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to // 'success' while we send the others to 'failure'. final long offset = lastMessageOffset.get(); - if ( offset == 0L ) { + if (offset == 0L) { // all of the messages failed to send. Route FlowFile to failure - getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()}); + getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[]{flowFile, pe.getCause()}); session.transfer(flowFile, REL_FAILURE); } else { // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); - - getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { - messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); - + + getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into" + + " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[]{ + messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause()}); + session.transfer(successfulMessages, REL_SUCCESS); session.transfer(failedMessages, REL_FAILURE); session.remove(flowFile); session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); } } finally { - if ( error ) { + if (error) { producer.close(); } else { returnProducer(producer); } } - + } } - + } diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html index 60611b6003..10c7082351 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/additionalDetails.html @@ -24,22 +24,22 @@

Description:

- This Processors polls Apache Kafka - for data. When a message is received from Kafka, this Processor emits a FlowFile - where the content of the FlowFile is the value of the Kafka message. If the - message has a key associated with it, an attribute named kafka.key - will be added to the FlowFile, with the value being the UTF-8 Encoded value - of the Message's Key. + This Processors polls Apache Kafka + for data. When a message is received from Kafka, this Processor emits a FlowFile + where the content of the FlowFile is the value of the Kafka message. If the + message has a key associated with it, an attribute named kafka.key + will be added to the FlowFile, with the value being the UTF-8 Encoded value + of the Message's Key.

- Kafka supports the notion of a Consumer Group when pulling messages in order to - provide scalability while still offering a publish-subscribe interface. Each - Consumer Group must have a unique identifier. The Consumer Group identifier that - is used by NiFi is the UUID of the Processor. This means that all of the nodes - within a cluster will use the same Consumer Group Identifier so that they do - not receive duplicate data but multiple GetKafka Processors can be used to pull - from multiple Topics, as each Processor will receive a different Processor UUID - and therefore a different Consumer Group Identifier. + Kafka supports the notion of a Consumer Group when pulling messages in order to + provide scalability while still offering a publish-subscribe interface. Each + Consumer Group must have a unique identifier. The Consumer Group identifier that + is used by NiFi is the UUID of the Processor. This means that all of the nodes + within a cluster will use the same Consumer Group Identifier so that they do + not receive duplicate data but multiple GetKafka Processors can be used to pull + from multiple Topics, as each Processor will receive a different Processor UUID + and therefore a different Consumer Group Identifier.

diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html index 04d9463ed6..d51ce95bfd 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/additionalDetails.html @@ -24,22 +24,22 @@

Description:

- This Processors puts the contents of a FlowFile to a Topic in - Apache Kafka. The full contents of - a FlowFile becomes the contents of a single message in Kafka. - This message is optionally assigned a key by using the - <Kafka Key> Property. + This Processors puts the contents of a FlowFile to a Topic in + Apache Kafka. The full contents of + a FlowFile becomes the contents of a single message in Kafka. + This message is optionally assigned a key by using the + <Kafka Key> Property.

-

- The Processor allows the user to configure an optional Message Delimiter that - can be used to send many messages per FlowFile. For example, a \n could be used - to indicate that the contents of the FlowFile should be used to send one message - per line of text. If the property is not set, the entire contents of the FlowFile - will be sent as a single message. When using the delimiter, if some messages are - successfully sent but other messages fail to send, the FlowFile will be FORKed into - two child FlowFiles, with the successfully sent messages being routed to 'success' - and the messages that could not be sent going to 'failure'. -

+

+ The Processor allows the user to configure an optional Message Delimiter that + can be used to send many messages per FlowFile. For example, a \n could be used + to indicate that the contents of the FlowFile should be used to send one message + per line of text. If the property is not set, the entire contents of the FlowFile + will be sent as a single message. When using the delimiter, if some messages are + successfully sent but other messages fail to send, the FlowFile will be FORKed into + two child FlowFiles, with the successfully sent messages being routed to 'success' + and the messages that could not be sent going to 'failure'. +

diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java index 10560f8e39..69ff48cbe4 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java @@ -37,14 +37,13 @@ import org.mockito.stubbing.Answer; public class TestGetKafka { - @BeforeClass public static void configureLogging() { - System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO"); + System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO"); System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka", "INFO"); BasicConfigurator.configure(); } - + @Test @Ignore("Intended only for local tests to verify functionality.") public void testIntegrationLocally() { @@ -53,24 +52,23 @@ public class TestGetKafka { runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); - + runner.run(20, false); - + final List flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS); - for ( final MockFlowFile flowFile : flowFiles ) { - System.out.println(flowFile.getAttributes()); - System.out.println(new String(flowFile.toByteArray())); - System.out.println(); + for (final MockFlowFile flowFile : flowFiles) { + System.out.println(flowFile.getAttributes()); + System.out.println(new String(flowFile.toByteArray())); + System.out.println(); } } - - + @Test public void testWithDelimiter() { final List messages = new ArrayList<>(); messages.add("Hello"); messages.add("Good-bye"); - + final TestableProcessor proc = new TestableProcessor(null, messages); final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); @@ -79,20 +77,20 @@ public class TestGetKafka { runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n"); runner.setProperty(GetKafka.BATCH_SIZE, "2"); - + runner.run(); - + runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1); final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0); mff.assertContentEquals("Hello\nGood-bye"); } - + @Test public void testWithDelimiterAndNotEnoughMessages() { final List messages = new ArrayList<>(); messages.add("Hello"); messages.add("Good-bye"); - + final TestableProcessor proc = new TestableProcessor(null, messages); final TestRunner runner = TestRunners.newTestRunner(proc); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); @@ -101,40 +99,40 @@ public class TestGetKafka { runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n"); runner.setProperty(GetKafka.BATCH_SIZE, "3"); - + runner.run(); - + runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1); final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0); mff.assertContentEquals("Hello\nGood-bye"); } - - + private static class TestableProcessor extends GetKafka { + private final byte[] key; private final Iterator messageItr; - + public TestableProcessor(final byte[] key, final List messages) { this.key = key; messageItr = messages.iterator(); } - + @Override public void createConsumers(ProcessContext context) { } - + @Override @SuppressWarnings({"unchecked", "rawtypes"}) protected ConsumerIterator getStreamIterator() { final ConsumerIterator itr = Mockito.mock(ConsumerIterator.class); - + Mockito.doAnswer(new Answer() { @Override public Boolean answer(final InvocationOnMock invocation) throws Throwable { return messageItr.hasNext(); } }).when(itr).hasNext(); - + Mockito.doAnswer(new Answer() { @Override public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable { @@ -142,21 +140,21 @@ public class TestGetKafka { Mockito.when(mam.key()).thenReturn(key); Mockito.when(mam.offset()).thenReturn(0L); Mockito.when(mam.partition()).thenReturn(0); - + Mockito.doAnswer(new Answer() { @Override public byte[] answer(InvocationOnMock invocation) throws Throwable { return messageItr.next().getBytes(); } - + }).when(mam).message(); - + return mam; } }).when(itr).next(); - + return itr; } } - + } diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 56a5c4b0ae..9500e29399 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -36,13 +36,19 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.provenance.ProvenanceReporter; -import org.apache.nifi.util.*; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.MockFlowFileQueue; +import org.apache.nifi.util.MockProcessSession; +import org.apache.nifi.util.MockProvenanceReporter; +import org.apache.nifi.util.MockSessionFactory; +import org.apache.nifi.util.SharedSessionState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; - public class TestPutKafka { @Test @@ -53,15 +59,15 @@ public class TestPutKafka { runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); runner.run(); - + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); - + final List messages = proc.getProducer().getMessages(); assertEquals(11, messages.size()); - + assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0))); assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1))); assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2))); @@ -74,8 +80,7 @@ public class TestPutKafka { assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9))); assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10))); } - - + @Test public void testWithImmediateFailure() { final TestableProcessor proc = new TestableProcessor(0); @@ -84,17 +89,16 @@ public class TestPutKafka { runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - + final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; runner.enqueue(text.getBytes()); runner.run(); - + runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); mff.assertContentEquals(text); } - - + @Test public void testPartialFailure() { final TestableProcessor proc = new TestableProcessor(2); @@ -104,22 +108,21 @@ public class TestPutKafka { runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); - + final byte[] bytes = "1\n2\n3\n4".getBytes(); runner.enqueue(bytes); runner.run(); - + runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); runner.assertTransferCount(PutKafka.REL_FAILURE, 1); final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); successFF.assertContentEquals("1\n2\n"); - + final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); failureFF.assertContentEquals("3\n4"); } - - + @Test public void testWithEmptyMessages() { final TestableProcessor proc = new TestableProcessor(); @@ -128,11 +131,11 @@ public class TestPutKafka { runner.setProperty(PutKafka.KEY, "key1"); runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); - + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); runner.enqueue(bytes); runner.run(); - + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); final List msgs = proc.getProducer().getMessages(); @@ -144,7 +147,7 @@ public class TestPutKafka { } @Test - public void testProvenanceReporterMessagesCount(){ + public void testProvenanceReporterMessagesCount() { final TestableProcessor processor = new TestableProcessor(); ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter()); @@ -157,7 +160,6 @@ public class TestPutKafka { MockProcessSession mockProcessSession = new MockProcessSession(sharedState); Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession); - final TestRunner runner = TestRunners.newTestRunner(processor); Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue); Whitebox.setInternalState(runner, "sessionFactory", sessionFactory); @@ -176,7 +178,7 @@ public class TestPutKafka { } @Test - public void testProvenanceReporterWithoutDelimiterMessagesCount(){ + public void testProvenanceReporterWithoutDelimiterMessagesCount() { final TestableProcessor processor = new TestableProcessor(); ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter()); @@ -189,7 +191,6 @@ public class TestPutKafka { MockProcessSession mockProcessSession = new MockProcessSession(sharedState); Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession); - final TestRunner runner = TestRunners.newTestRunner(processor); Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue); Whitebox.setInternalState(runner, "sessionFactory", sessionFactory); @@ -206,97 +207,97 @@ public class TestPutKafka { Mockito.verify(spyProvenanceReporter, Mockito.atLeastOnce()).send(mockFlowFile, "kafka://topic1"); } - @Test - @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") - public void testKeyValuePut() { - final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); - runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); - runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); - runner.setProperty(PutKafka.KEY, "${kafka.key}"); - runner.setProperty(PutKafka.TIMEOUT, "3 secs"); - runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); - - final Map attributes = new HashMap<>(); - attributes.put("kafka.topic", "test"); - attributes.put("kafka.key", "key3"); - - final byte[] data = "Hello, World, Again! ;)".getBytes(); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - runner.enqueue(data, attributes); - - runner.run(5); - - runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); - final List mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); - final MockFlowFile mff = mffs.get(0); - - assertTrue(Arrays.equals(data, mff.toByteArray())); - } - - - private static class TestableProcessor extends PutKafka { - private MockProducer producer; - private int failAfter = Integer.MAX_VALUE; - - public TestableProcessor() { - } - - public TestableProcessor(final int failAfter) { - this.failAfter = failAfter; - } - - @OnScheduled - public void instantiateProducer(final ProcessContext context) { - producer = new MockProducer(createConfig(context)); - producer.setFailAfter(failAfter); - } - - @Override - protected Producer createProducer(final ProcessContext context) { - return producer; - } - - public MockProducer getProducer() { - return producer; - } - } - - - private static class MockProducer extends Producer { - private int sendCount = 0; - private int failAfter = Integer.MAX_VALUE; - - private final List messages = new ArrayList<>(); - + @Test + @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") + public void testKeyValuePut() { + final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); + runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); + runner.setProperty(PutKafka.TOPIC, "${kafka.topic}"); + runner.setProperty(PutKafka.KEY, "${kafka.key}"); + runner.setProperty(PutKafka.TIMEOUT, "3 secs"); + runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue()); + + final Map attributes = new HashMap<>(); + attributes.put("kafka.topic", "test"); + attributes.put("kafka.key", "key3"); + + final byte[] data = "Hello, World, Again! ;)".getBytes(); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + runner.enqueue(data, attributes); + + runner.run(5); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4); + final List mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS); + final MockFlowFile mff = mffs.get(0); + + assertTrue(Arrays.equals(data, mff.toByteArray())); + } + + private static class TestableProcessor extends PutKafka { + + private MockProducer producer; + private int failAfter = Integer.MAX_VALUE; + + public TestableProcessor() { + } + + public TestableProcessor(final int failAfter) { + this.failAfter = failAfter; + } + + @OnScheduled + public void instantiateProducer(final ProcessContext context) { + producer = new MockProducer(createConfig(context)); + producer.setFailAfter(failAfter); + } + + @Override + protected Producer createProducer(final ProcessContext context) { + return producer; + } + + public MockProducer getProducer() { + return producer; + } + } + + private static class MockProducer extends Producer { + + private int sendCount = 0; + private int failAfter = Integer.MAX_VALUE; + + private final List messages = new ArrayList<>(); + public MockProducer(final ProducerConfig config) { super(config); } - + @Override public void send(final KeyedMessage message) { - if ( ++sendCount > failAfter ) { + if (++sendCount > failAfter) { throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); } else { messages.add(message.message()); } } - + public List getMessages() { return messages; } - + @Override public void send(final List> messages) { - for ( final KeyedMessage msg : messages ) { + for (final KeyedMessage msg : messages) { send(msg); } } - + public void setFailAfter(final int successCount) { failAfter = successCount; } - } + } } diff --git a/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml b/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml index 0acb59edc6..74fef7037a 100644 --- a/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-kafka-bundle/pom.xml @@ -26,12 +26,12 @@ nifi-kafka-nar - - - org.apache.nifi - nifi-kafka-processors - 0.1.0-incubating-SNAPSHOT - - + + + org.apache.nifi + nifi-kafka-processors + 0.1.0-incubating-SNAPSHOT + +