diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 55c67e36d6..ea4296ef02 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -24,6 +24,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -81,6 +82,26 @@ public class GetKafka extends AbstractProcessor {
.expressionLanguageSupported(false)
.defaultValue("30 secs")
.build();
+ public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be "
+ + "concatenated together with the string placed between the content of each message. "
+ + "If the messages from Kafka should not be concatenated together, leave this value at 1.")
+ .required(true)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("1")
+ .build();
+ public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+ .name("Message Demarcator")
+ .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the "
+ + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, "
+ + "this value will be placed in between them.")
+ .required(true)
+ .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string
+ .expressionLanguageSupported(false)
+ .defaultValue("\\n")
+ .build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
@@ -113,6 +134,8 @@ public class GetKafka extends AbstractProcessor {
props.add(ZOOKEEPER_CONNECTION_STRING);
props.add(TOPIC);
props.add(ZOOKEEPER_COMMIT_DELAY);
+ props.add(BATCH_SIZE);
+ props.add(MESSAGE_DEMARCATOR);
props.add(clientNameWithDefault);
props.add(KAFKA_TIMEOUT);
props.add(ZOOKEEPER_TIMEOUT);
@@ -181,15 +204,25 @@ public class GetKafka extends AbstractProcessor {
}
}
+ protected ConsumerIterator getStreamIterator() {
+ return streamIterators.poll();
+ }
+
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- ConsumerIterator iterator = streamIterators.poll();
+ ConsumerIterator iterator = getStreamIterator();
if ( iterator == null ) {
return;
}
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+ final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
+ final String topic = context.getProperty(TOPIC).getValue();
+
FlowFile flowFile = null;
try {
+ // add the current thread to the Set of those to be interrupted if processor stopped.
interruptionLock.lock();
try {
interruptableThreads.add(Thread.currentThread());
@@ -197,52 +230,73 @@ public class GetKafka extends AbstractProcessor {
interruptionLock.unlock();
}
- try {
- if (!iterator.hasNext() ) {
- return;
- }
- } catch (final Exception e) {
- getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
- iterator = null;
- return;
- }
-
final long start = System.nanoTime();
- final MessageAndMetadata mam = iterator.next();
-
- if ( mam == null ) {
- return;
- }
-
- final byte[] key = mam.key();
+ flowFile = session.create();
final Map attributes = new HashMap<>();
- if ( key != null ) {
- attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+ attributes.put("kafka.topic", topic);
+
+ int numMessages = 0;
+ for (int msgCount = 0; msgCount < batchSize; msgCount++) {
+ // if the processor is stopped, iterator.hasNext() will throw an Exception.
+ // In this case, we just break out of the loop.
+ try {
+ if ( !iterator.hasNext() ) {
+ break;
+ }
+ } catch (final Exception e) {
+ break;
+ }
+
+ final MessageAndMetadata mam = iterator.next();
+ if ( mam == null ) {
+ return;
+ }
+
+ final byte[] key = mam.key();
+
+ if ( batchSize == 1 ) {
+ // the kafka.key, kafka.offset, and kafka.partition attributes are added only
+ // for a batch size of 1.
+ if ( key != null ) {
+ attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+ }
+
+ attributes.put("kafka.offset", String.valueOf(mam.offset()));
+ attributes.put("kafka.partition", String.valueOf(mam.partition()));
+ }
+
+ // add the message to the FlowFile's contents
+ final boolean firstMessage = (msgCount == 0);
+ flowFile = session.append(flowFile, new OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException {
+ if ( !firstMessage ) {
+ out.write(demarcatorBytes);
+ }
+ out.write(mam.message());
+ }
+ });
+ numMessages++;
}
- attributes.put("kafka.offset", String.valueOf(mam.offset()));
- attributes.put("kafka.partition", String.valueOf(mam.partition()));
- attributes.put("kafka.topic", mam.topic());
- flowFile = session.create();
- flowFile = session.write(flowFile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(mam.message());
- }
- });
-
- flowFile = session.putAllAttributes(flowFile, attributes);
- final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
- getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
- session.transfer(flowFile, REL_SUCCESS);
+ // If we received no messages, remove the FlowFile. Otherwise, send to success.
+ if ( flowFile.getSize() == 0L ) {
+ session.remove(flowFile);
+ } else {
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+ session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, millis);
+ getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
+ session.transfer(flowFile, REL_SUCCESS);
+ }
} catch (final Exception e) {
getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
if ( flowFile != null ) {
session.remove(flowFile);
}
} finally {
+ // Remove the current thread from the Set of Threads to interrupt.
interruptionLock.lock();
try {
interruptableThreads.remove(Thread.currentThread());
@@ -250,6 +304,7 @@ public class GetKafka extends AbstractProcessor {
interruptionLock.unlock();
}
+ // Add the iterator back to the queue
if ( iterator != null ) {
streamIterators.offer(iterator);
}
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 5e5940cdfa..4b5a7423ad 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -48,7 +48,14 @@ import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+import org.apache.nifi.util.LongHolder;
+
+import scala.actors.threadpool.Arrays;
@SupportsBatching
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@@ -90,6 +97,24 @@ public class PutKafka extends AbstractProcessor {
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
+ public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
+ .name("Message Delimiter")
+ .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ + "If not specified, the entire content of the FlowFile will be used as a single message. "
+ + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ + "sent as a separate Kafka message.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+ public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+ .name("Max Buffer Size")
+ .description("The maximum amount of data to buffer in memory before sending to Kafka")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .defaultValue("1 MB")
+ .build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
@@ -98,14 +123,6 @@ public class PutKafka extends AbstractProcessor {
.expressionLanguageSupported(false)
.defaultValue("30 secs")
.build();
- public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
- .name("Max FlowFile Size")
- .description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
- .required(true)
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(false)
- .defaultValue("1 MB")
- .build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
@@ -123,10 +140,6 @@ public class PutKafka extends AbstractProcessor {
.name("failure")
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build();
- public static final Relationship REL_REJECT = new Relationship.Builder()
- .name("reject")
- .description("Any FlowFile whose size exceeds the property will be routed to this Relationship")
- .build();
private final BlockingQueue> producers = new LinkedBlockingQueue<>();
@@ -142,8 +155,9 @@ public class PutKafka extends AbstractProcessor {
props.add(TOPIC);
props.add(KEY);
props.add(DELIVERY_GUARANTEE);
+ props.add(MESSAGE_DELIMITER);
+ props.add(MAX_BUFFER_SIZE);
props.add(TIMEOUT);
- props.add(MAX_FLOWFILE_SIZE);
props.add(clientName);
return props;
}
@@ -153,7 +167,6 @@ public class PutKafka extends AbstractProcessor {
final Set relationships = new HashSet<>(1);
relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE);
- relationships.add(REL_REJECT);
return relationships;
}
@@ -167,11 +180,10 @@ public class PutKafka extends AbstractProcessor {
}
}
-
- private Producer createProducer(final ProcessContext context) {
- final String brokers = context.getProperty(SEED_BROKERS).getValue();
+ protected ProducerConfig createConfig(final ProcessContext context) {
+ final String brokers = context.getProperty(SEED_BROKERS).getValue();
- final Properties properties = new Properties();
+ final Properties properties = new Properties();
properties.setProperty("metadata.broker.list", brokers);
properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
@@ -180,8 +192,11 @@ public class PutKafka extends AbstractProcessor {
properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", "sync");
- final ProducerConfig config = new ProducerConfig(properties);
- return new Producer<>(config);
+ return new ProducerConfig(properties);
+ }
+
+ protected Producer createProducer(final ProcessContext context) {
+ return new Producer<>(createConfig(context));
}
private Producer borrowProducer(final ProcessContext context) {
@@ -201,52 +216,200 @@ public class PutKafka extends AbstractProcessor {
}
final long start = System.nanoTime();
- final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
- if ( flowFile.getSize() > maxSize ) {
- getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
- session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
- session.transfer(flowFile, REL_REJECT);
- return;
- }
-
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+ final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+ String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+ if ( delimiter != null ) {
+ delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+ }
- final byte[] value = new byte[(int) flowFile.getSize()];
- session.read(flowFile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, value);
- }
- });
-
+ final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
final Producer producer = borrowProducer(context);
- boolean error = false;
- try {
- final KeyedMessage message;
- if ( key == null ) {
- message = new KeyedMessage<>(topic, value);
- } else {
- message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
- }
-
- producer.send(message);
- final long nanos = System.nanoTime() - start;
-
- session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
- session.transfer(flowFile, REL_SUCCESS);
- getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
- } catch (final Exception e) {
- getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
- error = true;
- } finally {
- if ( error ) {
- producer.close();
- } else {
- returnProducer(producer);
- }
+
+ if ( delimiter == null ) {
+ // Send the entire FlowFile as a single message.
+ final byte[] value = new byte[(int) flowFile.getSize()];
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream in) throws IOException {
+ StreamUtils.fillBuffer(in, value);
+ }
+ });
+
+ boolean error = false;
+ try {
+ final KeyedMessage message;
+ if ( key == null ) {
+ message = new KeyedMessage<>(topic, value);
+ } else {
+ message = new KeyedMessage<>(topic, keyBytes, value);
+ }
+
+ producer.send(message);
+ final long nanos = System.nanoTime() - start;
+
+ session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+ } catch (final Exception e) {
+ getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
+ session.transfer(flowFile, REL_FAILURE);
+ error = true;
+ } finally {
+ if ( error ) {
+ producer.close();
+ } else {
+ returnProducer(producer);
+ }
+ }
+ } else {
+ final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
+
+ // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see
+ // if it matches some pattern. We can use this to search for the delimiter as we read through
+ // the stream of bytes in the FlowFile
+ final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
+
+ boolean error = false;
+ final LongHolder lastMessageOffset = new LongHolder(0L);
+ final LongHolder messagesSent = new LongHolder(0L);
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ byte[] data = null; // contents of a single message
+
+ boolean streamFinished = false;
+
+ final List> messages = new ArrayList<>(); // batch to send
+ long messageBytes = 0L; // size of messages in the 'messages' list
+
+ int nextByte;
+ try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
+ final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
+
+ // read until we're out of data.
+ while (!streamFinished) {
+ nextByte = in.read();
+
+ if ( nextByte > -1 ) {
+ baos.write(nextByte);
+ }
+
+ if (nextByte == -1) {
+ // we ran out of data. This message is complete.
+ data = baos.toByteArray();
+ streamFinished = true;
+ } else if ( buffer.addAndCompare((byte) nextByte) ) {
+ // we matched our delimiter. This message is complete. We want all of the bytes from the
+ // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want
+ // the delimiter itself to be sent.
+ data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
+ }
+
+ createMessage: if ( data != null ) {
+ // If the message has no data, ignore it.
+ if ( data.length == 0 ) {
+ data = null;
+ baos.reset();
+ break createMessage;
+ }
+
+ // either we ran out of data or we reached the end of the message.
+ // Either way, create the message because it's ready to send.
+ final KeyedMessage message;
+ if ( key == null ) {
+ message = new KeyedMessage<>(topic, data);
+ } else {
+ message = new KeyedMessage<>(topic, keyBytes, data);
+ }
+
+ // Add the message to the list of messages ready to send. If we've reached our
+ // threshold of how many we're willing to send (or if we're out of data), go ahead
+ // and send the whole List.
+ messages.add(message);
+ messageBytes += data.length;
+ if ( messageBytes >= maxBufferSize || streamFinished ) {
+ // send the messages, then reset our state.
+ try {
+ producer.send(messages);
+ } catch (final Exception e) {
+ // we wrap the general exception in ProcessException because we want to separate
+ // failures in sending messages from general Exceptions that would indicate bugs
+ // in the Processor. Failure to send a message should be handled appropriately, but
+ // we don't want to catch the general Exception or RuntimeException in order to catch
+ // failures from Kafka's Producer.
+ throw new ProcessException("Failed to send messages to Kafka", e);
+ }
+
+ messagesSent.addAndGet(messages.size()); // count number of messages sent
+
+ // reset state
+ messages.clear();
+ messageBytes = 0;
+
+ // We've successfully sent a batch of messages. Keep track of the byte offset in the
+ // FlowFile of the last successfully sent message. This way, if the messages cannot
+ // all be successfully sent, we know where to split off the data. This allows us to then
+ // split off the first X number of bytes and send to 'success' and then split off the rest
+ // and send them to 'failure'.
+ lastMessageOffset.set(in.getBytesConsumed());
+ }
+
+ // reset BAOS so that we can start a new message.
+ baos.reset();
+ data = null;
+ }
+ }
+
+ // If there are messages left, send them
+ if ( !messages.isEmpty() ) {
+ producer.send(messages);
+ }
+ }
+ }
+ });
+
+ final long nanos = System.nanoTime() - start;
+
+ session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+ session.transfer(flowFile, REL_SUCCESS);
+ getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+ } catch (final ProcessException pe) {
+ error = true;
+
+ // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can
+ // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
+ // 'success' while we send the others to 'failure'.
+ final long offset = lastMessageOffset.get();
+ if ( offset == 0L ) {
+ // all of the messages failed to send. Route FlowFile to failure
+ getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()});
+ session.transfer(flowFile, REL_FAILURE);
+ } else {
+ // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
+ final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
+ final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
+
+ getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
+ messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
+
+ session.transfer(successfulMessages, REL_SUCCESS);
+ session.transfer(failedMessages, REL_FAILURE);
+ session.remove(flowFile);
+ session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
+ }
+ } finally {
+ if ( error ) {
+ producer.close();
+ } else {
+ returnProducer(producer);
+ }
+ }
+
}
}
-
+
}
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
index d429d6b5eb..279dd759f4 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
@@ -53,22 +53,24 @@
-
- kafka.key |
- The key of the Kafka message, if it exists. If the message does not have a key,
- this attribute will not be added. |
-
kafka.topic |
The name of the Kafka Topic from which the message was received |
+
+ kafka.key |
+ The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key,
+ or if the batch size is greater than 1, this attribute will not be added. |
+
kafka.partition |
- The partition of the Kafka Topic from which the message was received |
+ The partition of the Kafka Topic from which the message was received. This attribute is added only
+ if the batch size is 1. |
kafka.offset |
- The offset of the message within the Kafka partition |
+ The offset of the message within the Kafka partition. This attribute is added only
+ if the batch size is 1. |
@@ -123,6 +125,30 @@
Supports expression language: false
+
+ Batch Size
+
+ - Specifies the maximum number of messages to combine into a single FlowFile.
+ These messages will be concatenated together with the <Message Demarcator>
+ string placed between the content of each message. If the messages from Kafka
+ should not be concatenated together, leave this value at 1.
+ - Default value: 1
+ - Supports expression language: false
+
+
+
+ Message Demarcator
+
+ - Specifies the characters to use in order to demarcate multiple messages from Kafka.
+ If the <Batch Size> property is set to 1, this value is ignored. Otherwise, for each two
+ subsequent messages in the batch, this value will be placed in between them. This property will
+ treat "\n" as a new-line, "\r" as a carriage return and "\t" as a tab character. All other
+ characters are treated as literal characters.
+
+ - Default value: \n
+ - Supports expression language: false
+
+
Client Name
- Client Name to use when communicating with Kafka
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
index 38256c5fe9..29b7c176f5 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
@@ -31,6 +31,16 @@
<Kafka Key> Property.
+
+ The Processor allows the user to configure an optional Message Delimiter that
+ can be used to send many messages per FlowFile. For example, a \n could be used
+ to indicate that the contents of the FlowFile should be used to send one message
+ per line of text. If the property is not set, the entire contents of the FlowFile
+ will be sent as a single message. When using the delimiter, if some messages are
+ successfully sent but other messages fail to send, the FlowFile will be FORKed into
+ two child FlowFiles, with the successfully sent messages being routed to 'success'
+ and the messages that could not be sent going to 'failure'.
+
Properties:
@@ -45,7 +55,7 @@
-
A comma-separated list of known Kafka Brokers in the format
- &lgt;host>:<port>. This list does not need to be
+ <host>:<port>. This list does not need to be
exhaustive but provides a mechanism for determining which
other nodes belong to the Kafka cluster.
@@ -106,6 +116,18 @@
- Supports expression language: false
+ Message Delimiter
+
+ -
+ Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile.
+ If not specified, the entire content of the FlowFile will be used as a single message.
+ If specified, the contents of the FlowFile will be split on this delimiter and each section
+ sent as a separate Kafka message.
+
+ - Default value: no default
+ - Supports expression language: true
+
+
Communications Timeout
-
@@ -116,16 +138,10 @@
- Supports expression language: false
- Max FlowFile Size
+ Max Buffer Size
-
- Specifies the amount of data that can be buffered to send to Kafka. Because
- the contents of the FlowFile must be buffered into memory before they can
- be sent to Kafka, attempting to send a very large FlowFile can cause
- problems by causing the machine to run out of memory.
- This helps to prevent the system from running out of memory, the PutKafka
- Processor exposes a property for specifying the maximum size of a FlowFile.
- If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
+ The maximum amount of data to buffer in memory before sending to Kafka
- Default value: 1 MB
- Supports expression language: false
@@ -148,25 +164,21 @@
- success
- All FlowFiles that are successfully sent to Kafka are routed
- to this relationship.
-
-
-
-
- - reject
-
- - Any FlowFile whose content size exceeds the configured value for
- the <Max FlowFile Size> property will be routed to this
- relationship.
+ to this relationship. If using the <Message Delimiter> property,
+ it's possible for some messages to be sent while others fail. In this
+ case, only the messages that are successfully sent will be routed to
+ this Relationship while the other messages will be routed to the
+ 'failure' relationship.
- failure
- - All FlowFiles that cannot be sent to Kafka for any reason other
- than their content size exceeding the value of the <Max FlowFile
- Size> property will be routed to this relationship.
+
- All FlowFiles that cannot be sent to Kafka for any reason be routed
+ to this relationship. If a portion of a FlowFile is successfully sent
+ to Kafka but not all, only those messages that cannot be sent to Kafka
+ will be routed to this Relationship.
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
index 2199a9ce7a..10560f8e39 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -16,20 +16,27 @@
*/
package org.apache.nifi.processors.kafka;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
-@Ignore("Intended only for local tests to verify functionality.")
public class TestGetKafka {
- public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
@BeforeClass
public static void configureLogging() {
@@ -39,9 +46,10 @@ public class TestGetKafka {
}
@Test
+ @Ignore("Intended only for local tests to verify functionality.")
public void testIntegrationLocally() {
final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
- runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
+ runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181");
runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
@@ -56,4 +64,99 @@ public class TestGetKafka {
}
}
+
+ @Test
+ public void testWithDelimiter() {
+ final List messages = new ArrayList<>();
+ messages.add("Hello");
+ messages.add("Good-bye");
+
+ final TestableProcessor proc = new TestableProcessor(null, messages);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+ runner.setProperty(GetKafka.TOPIC, "testX");
+ runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+ runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+ runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
+ runner.setProperty(GetKafka.BATCH_SIZE, "2");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
+ mff.assertContentEquals("Hello\nGood-bye");
+ }
+
+ @Test
+ public void testWithDelimiterAndNotEnoughMessages() {
+ final List messages = new ArrayList<>();
+ messages.add("Hello");
+ messages.add("Good-bye");
+
+ final TestableProcessor proc = new TestableProcessor(null, messages);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+ runner.setProperty(GetKafka.TOPIC, "testX");
+ runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+ runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+ runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
+ runner.setProperty(GetKafka.BATCH_SIZE, "3");
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
+ mff.assertContentEquals("Hello\nGood-bye");
+ }
+
+
+ private static class TestableProcessor extends GetKafka {
+ private final byte[] key;
+ private final Iterator messageItr;
+
+ public TestableProcessor(final byte[] key, final List messages) {
+ this.key = key;
+ messageItr = messages.iterator();
+ }
+
+ @Override
+ public void createConsumers(ProcessContext context) {
+ }
+
+ @Override
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ protected ConsumerIterator getStreamIterator() {
+ final ConsumerIterator itr = Mockito.mock(ConsumerIterator.class);
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Boolean answer(final InvocationOnMock invocation) throws Throwable {
+ return messageItr.hasNext();
+ }
+ }).when(itr).hasNext();
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable {
+ final MessageAndMetadata mam = Mockito.mock(MessageAndMetadata.class);
+ Mockito.when(mam.key()).thenReturn(key);
+ Mockito.when(mam.offset()).thenReturn(0L);
+ Mockito.when(mam.partition()).thenReturn(0);
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public byte[] answer(InvocationOnMock invocation) throws Throwable {
+ return messageItr.next().getBytes();
+ }
+
+ }).when(mam).message();
+
+ return mam;
+ }
+ }).when(itr).next();
+
+ return itr;
+ }
+ }
+
}
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 2e6aacfd7d..cf7ed681a1 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -1,12 +1,22 @@
package org.apache.nifi.processors.kafka;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import kafka.common.FailedToSendMessageException;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.annotation.OnScheduled;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -14,10 +24,109 @@ import org.junit.Ignore;
import org.junit.Test;
-@Ignore("Intended only for local testing to verify functionality.")
public class TestPutKafka {
+ @Test
+ public void testMultipleKeyValuePerFlowFile() {
+ final TestableProcessor proc = new TestableProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutKafka.TOPIC, "topic1");
+ runner.setProperty(PutKafka.KEY, "key1");
+ runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+ runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+
+ runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+
+ final List messages = proc.getProducer().getMessages();
+ assertEquals(11, messages.size());
+
+ assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
+ assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1)));
+ assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2)));
+ assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3)));
+ assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4)));
+ assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5)));
+ assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6)));
+ assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7)));
+ assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8)));
+ assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9)));
+ assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
+ }
+
+
+ @Test
+ public void testWithImmediateFailure() {
+ final TestableProcessor proc = new TestableProcessor(0);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutKafka.TOPIC, "topic1");
+ runner.setProperty(PutKafka.KEY, "key1");
+ runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+ runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+
+ final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
+ runner.enqueue(text.getBytes());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+ mff.assertContentEquals(text);
+ }
+
+
+ @Test
+ public void testPartialFailure() {
+ final TestableProcessor proc = new TestableProcessor(2);
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutKafka.TOPIC, "topic1");
+ runner.setProperty(PutKafka.KEY, "key1");
+ runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+ runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+ runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
+
+ final byte[] bytes = "1\n2\n3\n4".getBytes();
+ runner.enqueue(bytes);
+ runner.run();
+
+ runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
+ runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+ final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+ successFF.assertContentEquals("1\n2\n");
+
+ final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+ failureFF.assertContentEquals("3\n4");
+ }
+
+
+ @Test
+ public void testWithEmptyMessages() {
+ final TestableProcessor proc = new TestableProcessor();
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setProperty(PutKafka.TOPIC, "topic1");
+ runner.setProperty(PutKafka.KEY, "key1");
+ runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+ runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+
+ final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+ runner.enqueue(bytes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+
+ final List msgs = proc.getProducer().getMessages();
+ assertEquals(4, msgs.size());
+ assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
+ assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
+ assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
+ assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
+ }
+
+
@Test
+ @Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePut() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
@@ -45,4 +154,67 @@ public class TestPutKafka {
assertTrue(Arrays.equals(data, mff.toByteArray()));
}
+
+ private static class TestableProcessor extends PutKafka {
+ private MockProducer producer;
+ private int failAfter = Integer.MAX_VALUE;
+
+ public TestableProcessor() {
+ }
+
+ public TestableProcessor(final int failAfter) {
+ this.failAfter = failAfter;
+ }
+
+ @OnScheduled
+ public void instantiateProducer(final ProcessContext context) {
+ producer = new MockProducer(createConfig(context));
+ producer.setFailAfter(failAfter);
+ }
+
+ @Override
+ protected Producer createProducer(final ProcessContext context) {
+ return producer;
+ }
+
+ public MockProducer getProducer() {
+ return producer;
+ }
+ }
+
+
+ private static class MockProducer extends Producer {
+ private int sendCount = 0;
+ private int failAfter = Integer.MAX_VALUE;
+
+ private final List messages = new ArrayList<>();
+
+ public MockProducer(final ProducerConfig config) {
+ super(config);
+ }
+
+ @Override
+ public void send(final KeyedMessage message) {
+ if ( ++sendCount > failAfter ) {
+ throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
+ } else {
+ messages.add(message.message());
+ }
+ }
+
+ public List getMessages() {
+ return messages;
+ }
+
+ @Override
+ public void send(final List> messages) {
+ for ( final KeyedMessage msg : messages ) {
+ send(msg);
+ }
+ }
+
+ public void setFailAfter(final int successCount) {
+ failAfter = successCount;
+ }
+ }
}