NIFI-220: Allow for demarcator to be specified for Kafka Get and Put and added unit tests; updated docs

This commit is contained in:
Mark Payne 2015-01-04 21:10:34 -05:00
parent 3e2f790672
commit c91c7e7897
6 changed files with 662 additions and 131 deletions

View File

@ -24,6 +24,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata; import kafka.message.MessageAndMetadata;
import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
@ -81,6 +82,26 @@ public class GetKafka extends AbstractProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("30 secs") .defaultValue("30 secs")
.build(); .build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be "
+ "concatenated together with the <Message Demarcator> string placed between the content of each message. "
+ "If the messages from Kafka should not be concatenated together, leave this value at 1.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1")
.build();
public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("Message Demarcator")
.description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> "
+ "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, "
+ "this value will be placed in between them.")
.required(true)
.addValidator(Validator.VALID) // accept anything as a demarcator, including empty string
.expressionLanguageSupported(false)
.defaultValue("\\n")
.build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name") .name("Client Name")
.description("Client Name to use when communicating with Kafka") .description("Client Name to use when communicating with Kafka")
@ -113,6 +134,8 @@ public class GetKafka extends AbstractProcessor {
props.add(ZOOKEEPER_CONNECTION_STRING); props.add(ZOOKEEPER_CONNECTION_STRING);
props.add(TOPIC); props.add(TOPIC);
props.add(ZOOKEEPER_COMMIT_DELAY); props.add(ZOOKEEPER_COMMIT_DELAY);
props.add(BATCH_SIZE);
props.add(MESSAGE_DEMARCATOR);
props.add(clientNameWithDefault); props.add(clientNameWithDefault);
props.add(KAFKA_TIMEOUT); props.add(KAFKA_TIMEOUT);
props.add(ZOOKEEPER_TIMEOUT); props.add(ZOOKEEPER_TIMEOUT);
@ -181,15 +204,25 @@ public class GetKafka extends AbstractProcessor {
} }
} }
protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
return streamIterators.poll();
}
@Override @Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
ConsumerIterator<byte[], byte[]> iterator = streamIterators.poll(); ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
if ( iterator == null ) { if ( iterator == null ) {
return; return;
} }
final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
final String topic = context.getProperty(TOPIC).getValue();
FlowFile flowFile = null; FlowFile flowFile = null;
try { try {
// add the current thread to the Set of those to be interrupted if processor stopped.
interruptionLock.lock(); interruptionLock.lock();
try { try {
interruptableThreads.add(Thread.currentThread()); interruptableThreads.add(Thread.currentThread());
@ -197,52 +230,73 @@ public class GetKafka extends AbstractProcessor {
interruptionLock.unlock(); interruptionLock.unlock();
} }
try {
if (!iterator.hasNext() ) {
return;
}
} catch (final Exception e) {
getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
iterator = null;
return;
}
final long start = System.nanoTime(); final long start = System.nanoTime();
final MessageAndMetadata<byte[], byte[]> mam = iterator.next(); flowFile = session.create();
if ( mam == null ) {
return;
}
final byte[] key = mam.key();
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
if ( key != null ) { attributes.put("kafka.topic", topic);
attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
int numMessages = 0;
for (int msgCount = 0; msgCount < batchSize; msgCount++) {
// if the processor is stopped, iterator.hasNext() will throw an Exception.
// In this case, we just break out of the loop.
try {
if ( !iterator.hasNext() ) {
break;
}
} catch (final Exception e) {
break;
}
final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
if ( mam == null ) {
return;
}
final byte[] key = mam.key();
if ( batchSize == 1 ) {
// the kafka.key, kafka.offset, and kafka.partition attributes are added only
// for a batch size of 1.
if ( key != null ) {
attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
}
attributes.put("kafka.offset", String.valueOf(mam.offset()));
attributes.put("kafka.partition", String.valueOf(mam.partition()));
}
// add the message to the FlowFile's contents
final boolean firstMessage = (msgCount == 0);
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if ( !firstMessage ) {
out.write(demarcatorBytes);
}
out.write(mam.message());
}
});
numMessages++;
} }
attributes.put("kafka.offset", String.valueOf(mam.offset()));
attributes.put("kafka.partition", String.valueOf(mam.partition()));
attributes.put("kafka.topic", mam.topic());
flowFile = session.create(); // If we received no messages, remove the FlowFile. Otherwise, send to success.
flowFile = session.write(flowFile, new OutputStreamCallback() { if ( flowFile.getSize() == 0L ) {
@Override session.remove(flowFile);
public void process(final OutputStream out) throws IOException { } else {
out.write(mam.message()); flowFile = session.putAllAttributes(flowFile, attributes);
} final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
}); session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, millis);
getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
flowFile = session.putAllAttributes(flowFile, attributes); session.transfer(flowFile, REL_SUCCESS);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); }
session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
session.transfer(flowFile, REL_SUCCESS);
} catch (final Exception e) { } catch (final Exception e) {
getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e}); getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
if ( flowFile != null ) { if ( flowFile != null ) {
session.remove(flowFile); session.remove(flowFile);
} }
} finally { } finally {
// Remove the current thread from the Set of Threads to interrupt.
interruptionLock.lock(); interruptionLock.lock();
try { try {
interruptableThreads.remove(Thread.currentThread()); interruptableThreads.remove(Thread.currentThread());
@ -250,6 +304,7 @@ public class GetKafka extends AbstractProcessor {
interruptionLock.unlock(); interruptionLock.unlock();
} }
// Add the iterator back to the queue
if ( iterator != null ) { if ( iterator != null ) {
streamIterators.offer(iterator); streamIterators.offer(iterator);
} }

View File

@ -48,7 +48,14 @@ import org.apache.nifi.processor.annotation.Tags;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder;
import scala.actors.threadpool.Arrays;
@SupportsBatching @SupportsBatching
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@ -90,6 +97,24 @@ public class PutKafka extends AbstractProcessor {
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue()) .defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build(); .build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter")
.description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+ "If not specified, the entire content of the FlowFile will be used as a single message. "
+ "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+ "sent as a separate Kafka message.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size")
.description("The maximum amount of data to buffer in memory before sending to Kafka")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout") .name("Communications Timeout")
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error") .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
@ -98,14 +123,6 @@ public class PutKafka extends AbstractProcessor {
.expressionLanguageSupported(false) .expressionLanguageSupported(false)
.defaultValue("30 secs") .defaultValue("30 secs")
.build(); .build();
public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
.name("Max FlowFile Size")
.description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.expressionLanguageSupported(false)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name") .name("Client Name")
.description("Client Name to use when communicating with Kafka") .description("Client Name to use when communicating with Kafka")
@ -123,10 +140,6 @@ public class PutKafka extends AbstractProcessor {
.name("failure") .name("failure")
.description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
.build(); .build();
public static final Relationship REL_REJECT = new Relationship.Builder()
.name("reject")
.description("Any FlowFile whose size exceeds the <Max FlowFile Size> property will be routed to this Relationship")
.build();
private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>(); private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
@ -142,8 +155,9 @@ public class PutKafka extends AbstractProcessor {
props.add(TOPIC); props.add(TOPIC);
props.add(KEY); props.add(KEY);
props.add(DELIVERY_GUARANTEE); props.add(DELIVERY_GUARANTEE);
props.add(MESSAGE_DELIMITER);
props.add(MAX_BUFFER_SIZE);
props.add(TIMEOUT); props.add(TIMEOUT);
props.add(MAX_FLOWFILE_SIZE);
props.add(clientName); props.add(clientName);
return props; return props;
} }
@ -153,7 +167,6 @@ public class PutKafka extends AbstractProcessor {
final Set<Relationship> relationships = new HashSet<>(1); final Set<Relationship> relationships = new HashSet<>(1);
relationships.add(REL_SUCCESS); relationships.add(REL_SUCCESS);
relationships.add(REL_FAILURE); relationships.add(REL_FAILURE);
relationships.add(REL_REJECT);
return relationships; return relationships;
} }
@ -167,11 +180,10 @@ public class PutKafka extends AbstractProcessor {
} }
} }
protected ProducerConfig createConfig(final ProcessContext context) {
private Producer<byte[], byte[]> createProducer(final ProcessContext context) { final String brokers = context.getProperty(SEED_BROKERS).getValue();
final String brokers = context.getProperty(SEED_BROKERS).getValue();
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.setProperty("metadata.broker.list", brokers); properties.setProperty("metadata.broker.list", brokers);
properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
@ -180,8 +192,11 @@ public class PutKafka extends AbstractProcessor {
properties.setProperty("message.send.max.retries", "1"); properties.setProperty("message.send.max.retries", "1");
properties.setProperty("producer.type", "sync"); properties.setProperty("producer.type", "sync");
final ProducerConfig config = new ProducerConfig(properties); return new ProducerConfig(properties);
return new Producer<>(config); }
protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
return new Producer<>(createConfig(context));
} }
private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) { private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
@ -201,52 +216,200 @@ public class PutKafka extends AbstractProcessor {
} }
final long start = System.nanoTime(); final long start = System.nanoTime();
final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
if ( flowFile.getSize() > maxSize ) {
getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
session.transfer(flowFile, REL_REJECT);
return;
}
final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if ( delimiter != null ) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
final byte[] value = new byte[(int) flowFile.getSize()]; final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream in) throws IOException {
StreamUtils.fillBuffer(in, value);
}
});
final Producer<byte[], byte[]> producer = borrowProducer(context); final Producer<byte[], byte[]> producer = borrowProducer(context);
boolean error = false;
try { if ( delimiter == null ) {
final KeyedMessage<byte[], byte[]> message; // Send the entire FlowFile as a single message.
if ( key == null ) { final byte[] value = new byte[(int) flowFile.getSize()];
message = new KeyedMessage<>(topic, value); session.read(flowFile, new InputStreamCallback() {
} else { @Override
message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value); public void process(final InputStream in) throws IOException {
} StreamUtils.fillBuffer(in, value);
}
producer.send(message); });
final long nanos = System.nanoTime() - start;
boolean error = false;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic); try {
session.transfer(flowFile, REL_SUCCESS); final KeyedMessage<byte[], byte[]> message;
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)}); if ( key == null ) {
} catch (final Exception e) { message = new KeyedMessage<>(topic, value);
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e}); } else {
session.transfer(flowFile, REL_FAILURE); message = new KeyedMessage<>(topic, keyBytes, value);
error = true; }
} finally {
if ( error ) { producer.send(message);
producer.close(); final long nanos = System.nanoTime() - start;
} else {
returnProducer(producer); session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
} session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
} catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
session.transfer(flowFile, REL_FAILURE);
error = true;
} finally {
if ( error ) {
producer.close();
} else {
returnProducer(producer);
}
}
} else {
final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
// The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see
// if it matches some pattern. We can use this to search for the delimiter as we read through
// the stream of bytes in the FlowFile
final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
boolean error = false;
final LongHolder lastMessageOffset = new LongHolder(0L);
final LongHolder messagesSent = new LongHolder(0L);
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(final InputStream rawIn) throws IOException {
byte[] data = null; // contents of a single message
boolean streamFinished = false;
final List<KeyedMessage<byte[], byte[]>> messages = new ArrayList<>(); // batch to send
long messageBytes = 0L; // size of messages in the 'messages' list
int nextByte;
try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
// read until we're out of data.
while (!streamFinished) {
nextByte = in.read();
if ( nextByte > -1 ) {
baos.write(nextByte);
}
if (nextByte == -1) {
// we ran out of data. This message is complete.
data = baos.toByteArray();
streamFinished = true;
} else if ( buffer.addAndCompare((byte) nextByte) ) {
// we matched our delimiter. This message is complete. We want all of the bytes from the
// underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want
// the delimiter itself to be sent.
data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
}
createMessage: if ( data != null ) {
// If the message has no data, ignore it.
if ( data.length == 0 ) {
data = null;
baos.reset();
break createMessage;
}
// either we ran out of data or we reached the end of the message.
// Either way, create the message because it's ready to send.
final KeyedMessage<byte[], byte[]> message;
if ( key == null ) {
message = new KeyedMessage<>(topic, data);
} else {
message = new KeyedMessage<>(topic, keyBytes, data);
}
// Add the message to the list of messages ready to send. If we've reached our
// threshold of how many we're willing to send (or if we're out of data), go ahead
// and send the whole List.
messages.add(message);
messageBytes += data.length;
if ( messageBytes >= maxBufferSize || streamFinished ) {
// send the messages, then reset our state.
try {
producer.send(messages);
} catch (final Exception e) {
// we wrap the general exception in ProcessException because we want to separate
// failures in sending messages from general Exceptions that would indicate bugs
// in the Processor. Failure to send a message should be handled appropriately, but
// we don't want to catch the general Exception or RuntimeException in order to catch
// failures from Kafka's Producer.
throw new ProcessException("Failed to send messages to Kafka", e);
}
messagesSent.addAndGet(messages.size()); // count number of messages sent
// reset state
messages.clear();
messageBytes = 0;
// We've successfully sent a batch of messages. Keep track of the byte offset in the
// FlowFile of the last successfully sent message. This way, if the messages cannot
// all be successfully sent, we know where to split off the data. This allows us to then
// split off the first X number of bytes and send to 'success' and then split off the rest
// and send them to 'failure'.
lastMessageOffset.set(in.getBytesConsumed());
}
// reset BAOS so that we can start a new message.
baos.reset();
data = null;
}
}
// If there are messages left, send them
if ( !messages.isEmpty() ) {
producer.send(messages);
}
}
}
});
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
} catch (final ProcessException pe) {
error = true;
// There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can
// just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
// 'success' while we send the others to 'failure'.
final long offset = lastMessageOffset.get();
if ( offset == 0L ) {
// all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()});
session.transfer(flowFile, REL_FAILURE);
} else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(failedMessages, REL_FAILURE);
session.remove(flowFile);
session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
}
} finally {
if ( error ) {
producer.close();
} else {
returnProducer(producer);
}
}
} }
} }
} }

View File

@ -53,22 +53,24 @@
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
<tr>
<td>kafka.key</td>
<td>The key of the Kafka message, if it exists. If the message does not have a key,
this attribute will not be added.</td>
</tr>
<tr> <tr>
<td>kafka.topic</td> <td>kafka.topic</td>
<td>The name of the Kafka Topic from which the message was received</td> <td>The name of the Kafka Topic from which the message was received</td>
</tr> </tr>
<tr>
<td>kafka.key</td>
<td>The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key,
or if the batch size is greater than 1, this attribute will not be added.</td>
</tr>
<tr> <tr>
<td>kafka.partition</td> <td>kafka.partition</td>
<td>The partition of the Kafka Topic from which the message was received</td> <td>The partition of the Kafka Topic from which the message was received. This attribute is added only
if the batch size is 1.</td>
</tr> </tr>
<tr> <tr>
<td>kafka.offset</td> <td>kafka.offset</td>
<td>The offset of the message within the Kafka partition</td> <td>The offset of the message within the Kafka partition. This attribute is added only
if the batch size is 1.</td>
</tr> </tr>
</tbody> </tbody>
</table> </table>
@ -123,6 +125,30 @@
<li>Supports expression language: false</li> <li>Supports expression language: false</li>
</ul> </ul>
</li> </li>
<li><strong>Batch Size</strong>
<ul>
<li>Specifies the maximum number of messages to combine into a single FlowFile.
These messages will be concatenated together with the &lt;Message Demarcator&gt;
string placed between the content of each message. If the messages from Kafka
should not be concatenated together, leave this value at 1.</li>
<li>Default value: 1</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Message Demarcator</strong>
<ul>
<li>Specifies the characters to use in order to demarcate multiple messages from Kafka.
If the &lt;Batch Size&gt; property is set to 1, this value is ignored. Otherwise, for each two
subsequent messages in the batch, this value will be placed in between them. This property will
treat "\n" as a new-line, "\r" as a carriage return and "\t" as a tab character. All other
characters are treated as literal characters.
</li>
<li>Default value: \n</li>
<li>Supports expression language: false</li>
</ul>
</li>
<li><strong>Client Name</strong> <li><strong>Client Name</strong>
<ul> <ul>
<li>Client Name to use when communicating with Kafka</li> <li>Client Name to use when communicating with Kafka</li>

View File

@ -31,6 +31,16 @@
&lt;Kafka Key&gt; Property. &lt;Kafka Key&gt; Property.
</p> </p>
<p>
The Processor allows the user to configure an optional Message Delimiter that
can be used to send many messages per FlowFile. For example, a \n could be used
to indicate that the contents of the FlowFile should be used to send one message
per line of text. If the property is not set, the entire contents of the FlowFile
will be sent as a single message. When using the delimiter, if some messages are
successfully sent but other messages fail to send, the FlowFile will be FORKed into
two child FlowFiles, with the successfully sent messages being routed to 'success'
and the messages that could not be sent going to 'failure'.
</p>
<p> <p>
<strong>Properties:</strong> <strong>Properties:</strong>
@ -45,7 +55,7 @@
<ul> <ul>
<li> <li>
A comma-separated list of known Kafka Brokers in the format A comma-separated list of known Kafka Brokers in the format
&lgt;host&gt;:&lt;port&gt;. This list does not need to be &lt;host&gt;:&lt;port&gt;. This list does not need to be
exhaustive but provides a mechanism for determining which exhaustive but provides a mechanism for determining which
other nodes belong to the Kafka cluster. other nodes belong to the Kafka cluster.
</li> </li>
@ -106,6 +116,18 @@
<li>Supports expression language: false</li> <li>Supports expression language: false</li>
</ul> </ul>
</li> </li>
<li>Message Delimiter
<ul>
<li>
Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile.
If not specified, the entire content of the FlowFile will be used as a single message.
If specified, the contents of the FlowFile will be split on this delimiter and each section
sent as a separate Kafka message.
</li>
<li>Default value: no default</li>
<li>Supports expression language: true</li>
</ul>
</li>
<li><strong>Communications Timeout</strong> <li><strong>Communications Timeout</strong>
<ul> <ul>
<li> <li>
@ -116,16 +138,10 @@
<li>Supports expression language: false</li> <li>Supports expression language: false</li>
</ul> </ul>
</li> </li>
<li><strong>Max FlowFile Size</strong> <li><strong>Max Buffer Size</strong>
<ul> <ul>
<li> <li>
Specifies the amount of data that can be buffered to send to Kafka. Because The maximum amount of data to buffer in memory before sending to Kafka
the contents of the FlowFile must be buffered into memory before they can
be sent to Kafka, attempting to send a very large FlowFile can cause
problems by causing the machine to run out of memory.
This helps to prevent the system from running out of memory, the PutKafka
Processor exposes a property for specifying the maximum size of a FlowFile.
If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
</li> </li>
<li>Default value: 1 MB</li> <li>Default value: 1 MB</li>
<li>Supports expression language: false</li> <li>Supports expression language: false</li>
@ -148,25 +164,21 @@
<li>success <li>success
<ul> <ul>
<li>All FlowFiles that are successfully sent to Kafka are routed <li>All FlowFiles that are successfully sent to Kafka are routed
to this relationship. to this relationship. If using the &lt;Message Delimiter&gt; property,
</li> it's possible for some messages to be sent while others fail. In this
</ul> case, only the messages that are successfully sent will be routed to
</li> this Relationship while the other messages will be routed to the
'failure' relationship.
<li>reject
<ul>
<li>Any FlowFile whose content size exceeds the configured value for
the &lt;Max FlowFile Size&gt; property will be routed to this
relationship.
</li> </li>
</ul> </ul>
</li> </li>
<li>failure <li>failure
<ul> <ul>
<li>All FlowFiles that cannot be sent to Kafka for any reason other <li>All FlowFiles that cannot be sent to Kafka for any reason be routed
than their content size exceeding the value of the &lt;Max FlowFile to this relationship. If a portion of a FlowFile is successfully sent
Size&gt; property will be routed to this relationship. to Kafka but not all, only those messages that cannot be sent to Kafka
will be routed to this Relationship.
</li> </li>
</ul> </ul>
</li> </li>

View File

@ -16,20 +16,27 @@
*/ */
package org.apache.nifi.processors.kafka; package org.apache.nifi.processors.kafka;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import kafka.consumer.ConsumerIterator;
import kafka.message.MessageAndMetadata;
import org.apache.log4j.BasicConfigurator; import org.apache.log4j.BasicConfigurator;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Ignore("Intended only for local tests to verify functionality.")
public class TestGetKafka { public class TestGetKafka {
public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
@BeforeClass @BeforeClass
public static void configureLogging() { public static void configureLogging() {
@ -39,9 +46,10 @@ public class TestGetKafka {
} }
@Test @Test
@Ignore("Intended only for local tests to verify functionality.")
public void testIntegrationLocally() { public void testIntegrationLocally() {
final TestRunner runner = TestRunners.newTestRunner(GetKafka.class); final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION); runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181");
runner.setProperty(GetKafka.TOPIC, "testX"); runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs"); runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs"); runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
@ -56,4 +64,99 @@ public class TestGetKafka {
} }
} }
@Test
public void testWithDelimiter() {
final List<String> messages = new ArrayList<>();
messages.add("Hello");
messages.add("Good-bye");
final TestableProcessor proc = new TestableProcessor(null, messages);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
runner.setProperty(GetKafka.BATCH_SIZE, "2");
runner.run();
runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
mff.assertContentEquals("Hello\nGood-bye");
}
@Test
public void testWithDelimiterAndNotEnoughMessages() {
final List<String> messages = new ArrayList<>();
messages.add("Hello");
messages.add("Good-bye");
final TestableProcessor proc = new TestableProcessor(null, messages);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
runner.setProperty(GetKafka.TOPIC, "testX");
runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
runner.setProperty(GetKafka.BATCH_SIZE, "3");
runner.run();
runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
mff.assertContentEquals("Hello\nGood-bye");
}
private static class TestableProcessor extends GetKafka {
private final byte[] key;
private final Iterator<String> messageItr;
public TestableProcessor(final byte[] key, final List<String> messages) {
this.key = key;
messageItr = messages.iterator();
}
@Override
public void createConsumers(ProcessContext context) {
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
final ConsumerIterator<byte[], byte[]> itr = Mockito.mock(ConsumerIterator.class);
Mockito.doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(final InvocationOnMock invocation) throws Throwable {
return messageItr.hasNext();
}
}).when(itr).hasNext();
Mockito.doAnswer(new Answer<MessageAndMetadata>() {
@Override
public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable {
final MessageAndMetadata mam = Mockito.mock(MessageAndMetadata.class);
Mockito.when(mam.key()).thenReturn(key);
Mockito.when(mam.offset()).thenReturn(0L);
Mockito.when(mam.partition()).thenReturn(0);
Mockito.doAnswer(new Answer<byte[]>() {
@Override
public byte[] answer(InvocationOnMock invocation) throws Throwable {
return messageItr.next().getBytes();
}
}).when(mam).message();
return mam;
}
}).when(itr).next();
return itr;
}
}
} }

View File

@ -1,12 +1,22 @@
package org.apache.nifi.processors.kafka; package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import kafka.common.FailedToSendMessageException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.annotation.OnScheduled;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
@ -14,10 +24,109 @@ import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
@Ignore("Intended only for local testing to verify functionality.")
public class TestPutKafka { public class TestPutKafka {
@Test
public void testMultipleKeyValuePerFlowFile() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<byte[]> messages = proc.getProducer().getMessages();
assertEquals(11, messages.size());
assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1)));
assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2)));
assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3)));
assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4)));
assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5)));
assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6)));
assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7)));
assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8)));
assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9)));
assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
}
@Test
public void testWithImmediateFailure() {
final TestableProcessor proc = new TestableProcessor(0);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
runner.enqueue(text.getBytes());
runner.run();
runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
mff.assertContentEquals(text);
}
@Test
public void testPartialFailure() {
final TestableProcessor proc = new TestableProcessor(2);
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
final byte[] bytes = "1\n2\n3\n4".getBytes();
runner.enqueue(bytes);
runner.run();
runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
successFF.assertContentEquals("1\n2\n");
final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
failureFF.assertContentEquals("3\n4");
}
@Test
public void testWithEmptyMessages() {
final TestableProcessor proc = new TestableProcessor();
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setProperty(PutKafka.TOPIC, "topic1");
runner.setProperty(PutKafka.KEY, "key1");
runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
final List<byte[]> msgs = proc.getProducer().getMessages();
assertEquals(4, msgs.size());
assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
}
@Test @Test
@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
public void testKeyValuePut() { public void testKeyValuePut() {
final TestRunner runner = TestRunners.newTestRunner(PutKafka.class); final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092"); runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
@ -45,4 +154,67 @@ public class TestPutKafka {
assertTrue(Arrays.equals(data, mff.toByteArray())); assertTrue(Arrays.equals(data, mff.toByteArray()));
} }
private static class TestableProcessor extends PutKafka {
private MockProducer producer;
private int failAfter = Integer.MAX_VALUE;
public TestableProcessor() {
}
public TestableProcessor(final int failAfter) {
this.failAfter = failAfter;
}
@OnScheduled
public void instantiateProducer(final ProcessContext context) {
producer = new MockProducer(createConfig(context));
producer.setFailAfter(failAfter);
}
@Override
protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
return producer;
}
public MockProducer getProducer() {
return producer;
}
}
private static class MockProducer extends Producer<byte[], byte[]> {
private int sendCount = 0;
private int failAfter = Integer.MAX_VALUE;
private final List<byte[]> messages = new ArrayList<>();
public MockProducer(final ProducerConfig config) {
super(config);
}
@Override
public void send(final KeyedMessage<byte[], byte[]> message) {
if ( ++sendCount > failAfter ) {
throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
} else {
messages.add(message.message());
}
}
public List<byte[]> getMessages() {
return messages;
}
@Override
public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
for ( final KeyedMessage<byte[], byte[]> msg : messages ) {
send(msg);
}
}
public void setFailAfter(final int successCount) {
failAfter = successCount;
}
}
} }