From c91c7e78970a5261b05572dfbad02ee91287bf5e Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Sun, 4 Jan 2015 21:10:34 -0500 Subject: [PATCH] NIFI-220: Allow for demarcator to be specified for Kafka Get and Put and added unit tests; updated docs --- .../nifi/processors/kafka/GetKafka.java | 127 +++++--- .../nifi/processors/kafka/PutKafka.java | 287 ++++++++++++++---- .../index.html | 40 ++- .../index.html | 56 ++-- .../nifi/processors/kafka/TestGetKafka.java | 109 ++++++- .../nifi/processors/kafka/TestPutKafka.java | 174 ++++++++++- 6 files changed, 662 insertions(+), 131 deletions(-) diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 55c67e36d6..ea4296ef02 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -24,6 +24,7 @@ import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.Validator; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -81,6 +82,26 @@ public class GetKafka extends AbstractProcessor { .expressionLanguageSupported(false) .defaultValue("30 secs") .build(); + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " + + "concatenated together with the string placed between the content of each message. " + + "If the messages from Kafka should not be concatenated together, leave this value at 1.") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1") + .build(); + public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() + .name("Message Demarcator") + .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the " + + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " + + "this value will be placed in between them.") + .required(true) + .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string + .expressionLanguageSupported(false) + .defaultValue("\\n") + .build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() .name("Client Name") .description("Client Name to use when communicating with Kafka") @@ -113,6 +134,8 @@ public class GetKafka extends AbstractProcessor { props.add(ZOOKEEPER_CONNECTION_STRING); props.add(TOPIC); props.add(ZOOKEEPER_COMMIT_DELAY); + props.add(BATCH_SIZE); + props.add(MESSAGE_DEMARCATOR); props.add(clientNameWithDefault); props.add(KAFKA_TIMEOUT); props.add(ZOOKEEPER_TIMEOUT); @@ -181,15 +204,25 @@ public class GetKafka extends AbstractProcessor { } } + protected ConsumerIterator getStreamIterator() { + return streamIterators.poll(); + } + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - ConsumerIterator iterator = streamIterators.poll(); + ConsumerIterator iterator = getStreamIterator(); if ( iterator == null ) { return; } + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); + final String topic = context.getProperty(TOPIC).getValue(); + FlowFile flowFile = null; try { + // add the current thread to the Set of those to be interrupted if processor stopped. interruptionLock.lock(); try { interruptableThreads.add(Thread.currentThread()); @@ -197,52 +230,73 @@ public class GetKafka extends AbstractProcessor { interruptionLock.unlock(); } - try { - if (!iterator.hasNext() ) { - return; - } - } catch (final Exception e) { - getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e}); - iterator = null; - return; - } - final long start = System.nanoTime(); - final MessageAndMetadata mam = iterator.next(); - - if ( mam == null ) { - return; - } - - final byte[] key = mam.key(); + flowFile = session.create(); final Map attributes = new HashMap<>(); - if ( key != null ) { - attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); + attributes.put("kafka.topic", topic); + + int numMessages = 0; + for (int msgCount = 0; msgCount < batchSize; msgCount++) { + // if the processor is stopped, iterator.hasNext() will throw an Exception. + // In this case, we just break out of the loop. + try { + if ( !iterator.hasNext() ) { + break; + } + } catch (final Exception e) { + break; + } + + final MessageAndMetadata mam = iterator.next(); + if ( mam == null ) { + return; + } + + final byte[] key = mam.key(); + + if ( batchSize == 1 ) { + // the kafka.key, kafka.offset, and kafka.partition attributes are added only + // for a batch size of 1. + if ( key != null ) { + attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); + } + + attributes.put("kafka.offset", String.valueOf(mam.offset())); + attributes.put("kafka.partition", String.valueOf(mam.partition())); + } + + // add the message to the FlowFile's contents + final boolean firstMessage = (msgCount == 0); + flowFile = session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + if ( !firstMessage ) { + out.write(demarcatorBytes); + } + out.write(mam.message()); + } + }); + numMessages++; } - attributes.put("kafka.offset", String.valueOf(mam.offset())); - attributes.put("kafka.partition", String.valueOf(mam.partition())); - attributes.put("kafka.topic", mam.topic()); - flowFile = session.create(); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(mam.message()); - } - }); - - flowFile = session.putAllAttributes(flowFile, attributes); - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis); - getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis}); - session.transfer(flowFile, REL_SUCCESS); + // If we received no messages, remove the FlowFile. Otherwise, send to success. + if ( flowFile.getSize() == 0L ) { + session.remove(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, millis); + getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis}); + session.transfer(flowFile, REL_SUCCESS); + } } catch (final Exception e) { getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e}); if ( flowFile != null ) { session.remove(flowFile); } } finally { + // Remove the current thread from the Set of Threads to interrupt. interruptionLock.lock(); try { interruptableThreads.remove(Thread.currentThread()); @@ -250,6 +304,7 @@ public class GetKafka extends AbstractProcessor { interruptionLock.unlock(); } + // Add the iterator back to the queue if ( iterator != null ) { streamIterators.offer(iterator); } diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 5e5940cdfa..4b5a7423ad 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -48,7 +48,14 @@ import org.apache.nifi.processor.annotation.Tags; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteArrayOutputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; +import org.apache.nifi.util.LongHolder; + +import scala.actors.threadpool.Arrays; @SupportsBatching @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) @@ -90,6 +97,24 @@ public class PutKafka extends AbstractProcessor { .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .defaultValue(DELIVERY_BEST_EFFORT.getValue()) .build(); + public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() + .name("Message Delimiter") + .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " + + "If not specified, the entire content of the FlowFile will be used as a single message. " + + "If specified, the contents of the FlowFile will be split on this delimiter and each section " + + "sent as a separate Kafka message.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() + .name("Max Buffer Size") + .description("The maximum amount of data to buffer in memory before sending to Kafka") + .required(true) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("1 MB") + .build(); public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() .name("Communications Timeout") .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") @@ -98,14 +123,6 @@ public class PutKafka extends AbstractProcessor { .expressionLanguageSupported(false) .defaultValue("30 secs") .build(); - public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder() - .name("Max FlowFile Size") - .description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory") - .required(true) - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .expressionLanguageSupported(false) - .defaultValue("1 MB") - .build(); public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() .name("Client Name") .description("Client Name to use when communicating with Kafka") @@ -123,10 +140,6 @@ public class PutKafka extends AbstractProcessor { .name("failure") .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .build(); - public static final Relationship REL_REJECT = new Relationship.Builder() - .name("reject") - .description("Any FlowFile whose size exceeds the property will be routed to this Relationship") - .build(); private final BlockingQueue> producers = new LinkedBlockingQueue<>(); @@ -142,8 +155,9 @@ public class PutKafka extends AbstractProcessor { props.add(TOPIC); props.add(KEY); props.add(DELIVERY_GUARANTEE); + props.add(MESSAGE_DELIMITER); + props.add(MAX_BUFFER_SIZE); props.add(TIMEOUT); - props.add(MAX_FLOWFILE_SIZE); props.add(clientName); return props; } @@ -153,7 +167,6 @@ public class PutKafka extends AbstractProcessor { final Set relationships = new HashSet<>(1); relationships.add(REL_SUCCESS); relationships.add(REL_FAILURE); - relationships.add(REL_REJECT); return relationships; } @@ -167,11 +180,10 @@ public class PutKafka extends AbstractProcessor { } } - - private Producer createProducer(final ProcessContext context) { - final String brokers = context.getProperty(SEED_BROKERS).getValue(); + protected ProducerConfig createConfig(final ProcessContext context) { + final String brokers = context.getProperty(SEED_BROKERS).getValue(); - final Properties properties = new Properties(); + final Properties properties = new Properties(); properties.setProperty("metadata.broker.list", brokers); properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); @@ -180,8 +192,11 @@ public class PutKafka extends AbstractProcessor { properties.setProperty("message.send.max.retries", "1"); properties.setProperty("producer.type", "sync"); - final ProducerConfig config = new ProducerConfig(properties); - return new Producer<>(config); + return new ProducerConfig(properties); + } + + protected Producer createProducer(final ProcessContext context) { + return new Producer<>(createConfig(context)); } private Producer borrowProducer(final ProcessContext context) { @@ -201,52 +216,200 @@ public class PutKafka extends AbstractProcessor { } final long start = System.nanoTime(); - final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue(); - if ( flowFile.getSize() > maxSize ) { - getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile}); - session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize); - session.transfer(flowFile, REL_REJECT); - return; - } - final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8); + String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); + if ( delimiter != null ) { + delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); + } - final byte[] value = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, value); - } - }); - + final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); final Producer producer = borrowProducer(context); - boolean error = false; - try { - final KeyedMessage message; - if ( key == null ) { - message = new KeyedMessage<>(topic, value); - } else { - message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value); - } - - producer.send(message); - final long nanos = System.nanoTime() - start; - - session.getProvenanceReporter().send(flowFile, "kafka://" + topic); - session.transfer(flowFile, REL_SUCCESS); - getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); - } catch (final Exception e) { - getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - error = true; - } finally { - if ( error ) { - producer.close(); - } else { - returnProducer(producer); - } + + if ( delimiter == null ) { + // Send the entire FlowFile as a single message. + final byte[] value = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, value); + } + }); + + boolean error = false; + try { + final KeyedMessage message; + if ( key == null ) { + message = new KeyedMessage<>(topic, value); + } else { + message = new KeyedMessage<>(topic, keyBytes, value); + } + + producer.send(message); + final long nanos = System.nanoTime() - start; + + session.getProvenanceReporter().send(flowFile, "kafka://" + topic); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + } catch (final Exception e) { + getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + error = true; + } finally { + if ( error ) { + producer.close(); + } else { + returnProducer(producer); + } + } + } else { + final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); + + // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see + // if it matches some pattern. We can use this to search for the delimiter as we read through + // the stream of bytes in the FlowFile + final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); + + boolean error = false; + final LongHolder lastMessageOffset = new LongHolder(0L); + final LongHolder messagesSent = new LongHolder(0L); + + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + byte[] data = null; // contents of a single message + + boolean streamFinished = false; + + final List> messages = new ArrayList<>(); // batch to send + long messageBytes = 0L; // size of messages in the 'messages' list + + int nextByte; + try (final InputStream bufferedIn = new BufferedInputStream(rawIn); + final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { + + // read until we're out of data. + while (!streamFinished) { + nextByte = in.read(); + + if ( nextByte > -1 ) { + baos.write(nextByte); + } + + if (nextByte == -1) { + // we ran out of data. This message is complete. + data = baos.toByteArray(); + streamFinished = true; + } else if ( buffer.addAndCompare((byte) nextByte) ) { + // we matched our delimiter. This message is complete. We want all of the bytes from the + // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want + // the delimiter itself to be sent. + data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); + } + + createMessage: if ( data != null ) { + // If the message has no data, ignore it. + if ( data.length == 0 ) { + data = null; + baos.reset(); + break createMessage; + } + + // either we ran out of data or we reached the end of the message. + // Either way, create the message because it's ready to send. + final KeyedMessage message; + if ( key == null ) { + message = new KeyedMessage<>(topic, data); + } else { + message = new KeyedMessage<>(topic, keyBytes, data); + } + + // Add the message to the list of messages ready to send. If we've reached our + // threshold of how many we're willing to send (or if we're out of data), go ahead + // and send the whole List. + messages.add(message); + messageBytes += data.length; + if ( messageBytes >= maxBufferSize || streamFinished ) { + // send the messages, then reset our state. + try { + producer.send(messages); + } catch (final Exception e) { + // we wrap the general exception in ProcessException because we want to separate + // failures in sending messages from general Exceptions that would indicate bugs + // in the Processor. Failure to send a message should be handled appropriately, but + // we don't want to catch the general Exception or RuntimeException in order to catch + // failures from Kafka's Producer. + throw new ProcessException("Failed to send messages to Kafka", e); + } + + messagesSent.addAndGet(messages.size()); // count number of messages sent + + // reset state + messages.clear(); + messageBytes = 0; + + // We've successfully sent a batch of messages. Keep track of the byte offset in the + // FlowFile of the last successfully sent message. This way, if the messages cannot + // all be successfully sent, we know where to split off the data. This allows us to then + // split off the first X number of bytes and send to 'success' and then split off the rest + // and send them to 'failure'. + lastMessageOffset.set(in.getBytesConsumed()); + } + + // reset BAOS so that we can start a new message. + baos.reset(); + data = null; + } + } + + // If there are messages left, send them + if ( !messages.isEmpty() ) { + producer.send(messages); + } + } + } + }); + + final long nanos = System.nanoTime() - start; + + session.getProvenanceReporter().send(flowFile, "kafka://" + topic); + session.transfer(flowFile, REL_SUCCESS); + getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); + } catch (final ProcessException pe) { + error = true; + + // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can + // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to + // 'success' while we send the others to 'failure'. + final long offset = lastMessageOffset.get(); + if ( offset == 0L ) { + // all of the messages failed to send. Route FlowFile to failure + getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()}); + session.transfer(flowFile, REL_FAILURE); + } else { + // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages. + final FlowFile successfulMessages = session.clone(flowFile, 0L, offset); + final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset); + + getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] { + messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() }); + + session.transfer(successfulMessages, REL_SUCCESS); + session.transfer(failedMessages, REL_FAILURE); + session.remove(flowFile); + session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic); + } + } finally { + if ( error ) { + producer.close(); + } else { + returnProducer(producer); + } + } + } } - + } diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html index d429d6b5eb..279dd759f4 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html @@ -53,22 +53,24 @@ - - kafka.key - The key of the Kafka message, if it exists. If the message does not have a key, - this attribute will not be added. - kafka.topic The name of the Kafka Topic from which the message was received + + kafka.key + The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key, + or if the batch size is greater than 1, this attribute will not be added. + kafka.partition - The partition of the Kafka Topic from which the message was received + The partition of the Kafka Topic from which the message was received. This attribute is added only + if the batch size is 1. kafka.offset - The offset of the message within the Kafka partition + The offset of the message within the Kafka partition. This attribute is added only + if the batch size is 1. @@ -123,6 +125,30 @@
  • Supports expression language: false
  • + +
  • Batch Size +
      +
    • Specifies the maximum number of messages to combine into a single FlowFile. + These messages will be concatenated together with the <Message Demarcator> + string placed between the content of each message. If the messages from Kafka + should not be concatenated together, leave this value at 1.
    • +
    • Default value: 1
    • +
    • Supports expression language: false
    • +
    +
  • + +
  • Message Demarcator +
      +
    • Specifies the characters to use in order to demarcate multiple messages from Kafka. + If the <Batch Size> property is set to 1, this value is ignored. Otherwise, for each two + subsequent messages in the batch, this value will be placed in between them. This property will + treat "\n" as a new-line, "\r" as a carriage return and "\t" as a tab character. All other + characters are treated as literal characters. +
    • +
    • Default value: \n
    • +
    • Supports expression language: false
    • +
    +
  • Client Name
    • Client Name to use when communicating with Kafka
    • diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html index 38256c5fe9..29b7c176f5 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html +++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html @@ -31,6 +31,16 @@ <Kafka Key> Property.

      +

      + The Processor allows the user to configure an optional Message Delimiter that + can be used to send many messages per FlowFile. For example, a \n could be used + to indicate that the contents of the FlowFile should be used to send one message + per line of text. If the property is not set, the entire contents of the FlowFile + will be sent as a single message. When using the delimiter, if some messages are + successfully sent but other messages fail to send, the FlowFile will be FORKed into + two child FlowFiles, with the successfully sent messages being routed to 'success' + and the messages that could not be sent going to 'failure'. +

      Properties: @@ -45,7 +55,7 @@

      • A comma-separated list of known Kafka Brokers in the format - &lgt;host>:<port>. This list does not need to be + <host>:<port>. This list does not need to be exhaustive but provides a mechanism for determining which other nodes belong to the Kafka cluster.
      • @@ -106,6 +116,18 @@
      • Supports expression language: false
      +
    • Message Delimiter +
        +
      • + Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. + If not specified, the entire content of the FlowFile will be used as a single message. + If specified, the contents of the FlowFile will be split on this delimiter and each section + sent as a separate Kafka message. +
      • +
      • Default value: no default
      • +
      • Supports expression language: true
      • +
      +
    • Communications Timeout
      • @@ -116,16 +138,10 @@
      • Supports expression language: false
    • -
    • Max FlowFile Size +
    • Max Buffer Size
      • - Specifies the amount of data that can be buffered to send to Kafka. Because - the contents of the FlowFile must be buffered into memory before they can - be sent to Kafka, attempting to send a very large FlowFile can cause - problems by causing the machine to run out of memory. - This helps to prevent the system from running out of memory, the PutKafka - Processor exposes a property for specifying the maximum size of a FlowFile. - If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. + The maximum amount of data to buffer in memory before sending to Kafka
      • Default value: 1 MB
      • Supports expression language: false
      • @@ -148,25 +164,21 @@
      • success
        • All FlowFiles that are successfully sent to Kafka are routed - to this relationship. -
        • -
        -
      • - -
      • reject -
          -
        • Any FlowFile whose content size exceeds the configured value for - the <Max FlowFile Size> property will be routed to this - relationship. + to this relationship. If using the <Message Delimiter> property, + it's possible for some messages to be sent while others fail. In this + case, only the messages that are successfully sent will be routed to + this Relationship while the other messages will be routed to the + 'failure' relationship.
      • failure
          -
        • All FlowFiles that cannot be sent to Kafka for any reason other - than their content size exceeding the value of the <Max FlowFile - Size> property will be routed to this relationship. +
        • All FlowFiles that cannot be sent to Kafka for any reason be routed + to this relationship. If a portion of a FlowFile is successfully sent + to Kafka but not all, only those messages that cannot be sent to Kafka + will be routed to this Relationship.
      • diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java index 2199a9ce7a..10560f8e39 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java +++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java @@ -16,20 +16,27 @@ */ package org.apache.nifi.processors.kafka; +import java.util.ArrayList; +import java.util.Iterator; import java.util.List; +import kafka.consumer.ConsumerIterator; +import kafka.message.MessageAndMetadata; + import org.apache.log4j.BasicConfigurator; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; -@Ignore("Intended only for local tests to verify functionality.") public class TestGetKafka { - public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181"; @BeforeClass public static void configureLogging() { @@ -39,9 +46,10 @@ public class TestGetKafka { } @Test + @Ignore("Intended only for local tests to verify functionality.") public void testIntegrationLocally() { final TestRunner runner = TestRunners.newTestRunner(GetKafka.class); - runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION); + runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181"); runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); @@ -56,4 +64,99 @@ public class TestGetKafka { } } + + @Test + public void testWithDelimiter() { + final List messages = new ArrayList<>(); + messages.add("Hello"); + messages.add("Good-bye"); + + final TestableProcessor proc = new TestableProcessor(null, messages); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + runner.setProperty(GetKafka.TOPIC, "testX"); + runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); + runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); + runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n"); + runner.setProperty(GetKafka.BATCH_SIZE, "2"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0); + mff.assertContentEquals("Hello\nGood-bye"); + } + + @Test + public void testWithDelimiterAndNotEnoughMessages() { + final List messages = new ArrayList<>(); + messages.add("Hello"); + messages.add("Good-bye"); + + final TestableProcessor proc = new TestableProcessor(null, messages); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181"); + runner.setProperty(GetKafka.TOPIC, "testX"); + runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); + runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); + runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n"); + runner.setProperty(GetKafka.BATCH_SIZE, "3"); + + runner.run(); + + runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0); + mff.assertContentEquals("Hello\nGood-bye"); + } + + + private static class TestableProcessor extends GetKafka { + private final byte[] key; + private final Iterator messageItr; + + public TestableProcessor(final byte[] key, final List messages) { + this.key = key; + messageItr = messages.iterator(); + } + + @Override + public void createConsumers(ProcessContext context) { + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + protected ConsumerIterator getStreamIterator() { + final ConsumerIterator itr = Mockito.mock(ConsumerIterator.class); + + Mockito.doAnswer(new Answer() { + @Override + public Boolean answer(final InvocationOnMock invocation) throws Throwable { + return messageItr.hasNext(); + } + }).when(itr).hasNext(); + + Mockito.doAnswer(new Answer() { + @Override + public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable { + final MessageAndMetadata mam = Mockito.mock(MessageAndMetadata.class); + Mockito.when(mam.key()).thenReturn(key); + Mockito.when(mam.offset()).thenReturn(0L); + Mockito.when(mam.partition()).thenReturn(0); + + Mockito.doAnswer(new Answer() { + @Override + public byte[] answer(InvocationOnMock invocation) throws Throwable { + return messageItr.next().getBytes(); + } + + }).when(mam).message(); + + return mam; + } + }).when(itr).next(); + + return itr; + } + } + } diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 2e6aacfd7d..cf7ed681a1 100644 --- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -1,12 +1,22 @@ package org.apache.nifi.processors.kafka; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import kafka.common.FailedToSendMessageException; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.annotation.OnScheduled; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -14,10 +24,109 @@ import org.junit.Ignore; import org.junit.Test; -@Ignore("Intended only for local testing to verify functionality.") public class TestPutKafka { + @Test + public void testMultipleKeyValuePerFlowFile() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + + final List messages = proc.getProducer().getMessages(); + assertEquals(11, messages.size()); + + assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0))); + assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1))); + assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2))); + assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3))); + assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4))); + assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5))); + assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6))); + assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7))); + assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8))); + assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9))); + assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10))); + } + + + @Test + public void testWithImmediateFailure() { + final TestableProcessor proc = new TestableProcessor(0); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9"; + runner.enqueue(text.getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + mff.assertContentEquals(text); + } + + + @Test + public void testPartialFailure() { + final TestableProcessor proc = new TestableProcessor(2); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B"); + + final byte[] bytes = "1\n2\n3\n4".getBytes(); + runner.enqueue(bytes); + runner.run(); + + runner.assertTransferCount(PutKafka.REL_SUCCESS, 1); + runner.assertTransferCount(PutKafka.REL_FAILURE, 1); + + final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0); + successFF.assertContentEquals("1\n2\n"); + + final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0); + failureFF.assertContentEquals("3\n4"); + } + + + @Test + public void testWithEmptyMessages() { + final TestableProcessor proc = new TestableProcessor(); + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(PutKafka.TOPIC, "topic1"); + runner.setProperty(PutKafka.KEY, "key1"); + runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234"); + runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n"); + + final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(); + runner.enqueue(bytes); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1); + + final List msgs = proc.getProducer().getMessages(); + assertEquals(4, msgs.size()); + assertTrue(Arrays.equals("1".getBytes(), msgs.get(0))); + assertTrue(Arrays.equals("2".getBytes(), msgs.get(1))); + assertTrue(Arrays.equals("3".getBytes(), msgs.get(2))); + assertTrue(Arrays.equals("4".getBytes(), msgs.get(3))); + } + + @Test + @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...") public void testKeyValuePut() { final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); @@ -45,4 +154,67 @@ public class TestPutKafka { assertTrue(Arrays.equals(data, mff.toByteArray())); } + + private static class TestableProcessor extends PutKafka { + private MockProducer producer; + private int failAfter = Integer.MAX_VALUE; + + public TestableProcessor() { + } + + public TestableProcessor(final int failAfter) { + this.failAfter = failAfter; + } + + @OnScheduled + public void instantiateProducer(final ProcessContext context) { + producer = new MockProducer(createConfig(context)); + producer.setFailAfter(failAfter); + } + + @Override + protected Producer createProducer(final ProcessContext context) { + return producer; + } + + public MockProducer getProducer() { + return producer; + } + } + + + private static class MockProducer extends Producer { + private int sendCount = 0; + private int failAfter = Integer.MAX_VALUE; + + private final List messages = new ArrayList<>(); + + public MockProducer(final ProducerConfig config) { + super(config); + } + + @Override + public void send(final KeyedMessage message) { + if ( ++sendCount > failAfter ) { + throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages")); + } else { + messages.add(message.message()); + } + } + + public List getMessages() { + return messages; + } + + @Override + public void send(final List> messages) { + for ( final KeyedMessage msg : messages ) { + send(msg); + } + } + + public void setFailAfter(final int successCount) { + failAfter = successCount; + } + } }