This commit is contained in:
joewitt 2015-04-25 09:20:35 -04:00
parent b2a1f5217d
commit 8f2502c4e4
7 changed files with 532 additions and 524 deletions

View File

@ -61,14 +61,18 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@CapabilityDescription("Fetches messages from Apache Kafka")
@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
@WritesAttributes({ @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"),
@WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
@WritesAttributes({
@WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"),
@WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If"
+ " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
@WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
@WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1") })
@WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")})
public class GetKafka extends AbstractProcessor {
public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
.name("ZooKeeper Connection String")
.description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188")
.description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>"
+ " combinations. For example, host1:2181,host2:2181,host3:2188")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(false)
@ -82,7 +86,8 @@ public class GetKafka extends AbstractProcessor {
.build();
public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
.name("Zookeeper Commit Frequency")
.description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost")
.description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will"
+ " result in better overall performance but can result in more data duplication if a NiFi node is lost")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(false)
@ -137,7 +142,6 @@ public class GetKafka extends AbstractProcessor {
.description("All FlowFiles that are created are routed to this relationship")
.build();
private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
private volatile ConsumerConnector consumer;
@ -193,14 +197,14 @@ public class GetKafka extends AbstractProcessor {
this.streamIterators.clear();
for ( final KafkaStream<byte[], byte[]> stream : streams ) {
for (final KafkaStream<byte[], byte[]> stream : streams) {
streamIterators.add(stream.iterator());
}
}
@OnStopped
public void shutdownConsumer() {
if ( consumer != null ) {
if (consumer != null) {
try {
consumer.commitOffsets();
} finally {
@ -216,7 +220,7 @@ public class GetKafka extends AbstractProcessor {
// ability to shutdown the Processor.
interruptionLock.lock();
try {
for ( final Thread t : interruptableThreads ) {
for (final Thread t : interruptableThreads) {
t.interrupt();
}
@ -233,7 +237,7 @@ public class GetKafka extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
if ( iterator == null ) {
if (iterator == null) {
return;
}
@ -263,7 +267,7 @@ public class GetKafka extends AbstractProcessor {
// if the processor is stopped, iterator.hasNext() will throw an Exception.
// In this case, we just break out of the loop.
try {
if ( !iterator.hasNext() ) {
if (!iterator.hasNext()) {
break;
}
} catch (final Exception e) {
@ -271,16 +275,16 @@ public class GetKafka extends AbstractProcessor {
}
final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
if ( mam == null ) {
if (mam == null) {
return;
}
final byte[] key = mam.key();
if ( batchSize == 1 ) {
if (batchSize == 1) {
// the kafka.key, kafka.offset, and kafka.partition attributes are added only
// for a batch size of 1.
if ( key != null ) {
if (key != null) {
attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
}
@ -293,7 +297,7 @@ public class GetKafka extends AbstractProcessor {
flowFile = session.append(flowFile, new OutputStreamCallback() {
@Override
public void process(final OutputStream out) throws IOException {
if ( !firstMessage ) {
if (!firstMessage) {
out.write(demarcatorBytes);
}
out.write(mam.message());
@ -303,18 +307,18 @@ public class GetKafka extends AbstractProcessor {
}
// If we received no messages, remove the FlowFile. Otherwise, send to success.
if ( flowFile.getSize() == 0L ) {
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
} else {
flowFile = session.putAllAttributes(flowFile, attributes);
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis);
getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, numMessages, millis});
session.transfer(flowFile, REL_SUCCESS);
}
} catch (final Exception e) {
getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
if ( flowFile != null ) {
getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e});
if (flowFile != null) {
session.remove(flowFile);
}
} finally {
@ -327,7 +331,7 @@ public class GetKafka extends AbstractProcessor {
}
// Add the iterator back to the queue
if ( iterator != null ) {
if (iterator != null) {
streamIterators.offer(iterator);
}
}

View File

@ -61,12 +61,18 @@ import scala.actors.threadpool.Arrays;
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
public class PutKafka extends AbstractProcessor {
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed"
+ " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than"
+ " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after"
+ " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result"
+ " in data loss.");
public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
.name("Known Brokers")
@ -131,7 +137,6 @@ public class PutKafka extends AbstractProcessor {
.expressionLanguageSupported(false)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
@ -170,7 +175,6 @@ public class PutKafka extends AbstractProcessor {
return relationships;
}
@OnStopped
public void closeProducers() {
Producer<byte[], byte[]> producer;
@ -211,7 +215,7 @@ public class PutKafka extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
if (flowFile == null) {
return;
}
@ -220,14 +224,14 @@ public class PutKafka extends AbstractProcessor {
final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
if ( delimiter != null ) {
if (delimiter != null) {
delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
}
final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
final Producer<byte[], byte[]> producer = borrowProducer(context);
if ( delimiter == null ) {
if (delimiter == null) {
// Send the entire FlowFile as a single message.
final byte[] value = new byte[(int) flowFile.getSize()];
session.read(flowFile, new InputStreamCallback() {
@ -240,7 +244,7 @@ public class PutKafka extends AbstractProcessor {
boolean error = false;
try {
final KeyedMessage<byte[], byte[]> message;
if ( key == null ) {
if (key == null) {
message = new KeyedMessage<>(topic, value);
} else {
message = new KeyedMessage<>(topic, keyBytes, value);
@ -251,13 +255,13 @@ public class PutKafka extends AbstractProcessor {
session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[]{flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
} catch (final Exception e) {
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[]{flowFile, e});
session.transfer(flowFile, REL_FAILURE);
error = true;
} finally {
if ( error ) {
if (error) {
producer.close();
} else {
returnProducer(producer);
@ -294,7 +298,7 @@ public class PutKafka extends AbstractProcessor {
while (!streamFinished) {
nextByte = in.read();
if ( nextByte > -1 ) {
if (nextByte > -1) {
baos.write(nextByte);
}
@ -302,16 +306,16 @@ public class PutKafka extends AbstractProcessor {
// we ran out of data. This message is complete.
data = baos.toByteArray();
streamFinished = true;
} else if ( buffer.addAndCompare((byte) nextByte) ) {
} else if (buffer.addAndCompare((byte) nextByte)) {
// we matched our delimiter. This message is complete. We want all of the bytes from the
// underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want
// the delimiter itself to be sent.
data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
}
if ( data != null ) {
if (data != null) {
// If the message has no data, ignore it.
if ( data.length != 0 ) {
if (data.length != 0) {
// either we ran out of data or we reached the end of the message.
// Either way, create the message because it's ready to send.
final KeyedMessage<byte[], byte[]> message;
@ -361,7 +365,7 @@ public class PutKafka extends AbstractProcessor {
}
// If there are messages left, send them
if ( !messages.isEmpty() ) {
if (!messages.isEmpty()) {
try {
messagesSent.addAndGet(messages.size()); // add count of messages
producer.send(messages);
@ -376,7 +380,7 @@ public class PutKafka extends AbstractProcessor {
final long nanos = System.nanoTime() - start;
session.getProvenanceReporter().send(flowFile, "kafka://" + topic, "Sent " + messagesSent.get() + " messages");
session.transfer(flowFile, REL_SUCCESS);
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[]{messagesSent.get(), flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
} catch (final ProcessException pe) {
error = true;
@ -384,17 +388,18 @@ public class PutKafka extends AbstractProcessor {
// just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
// 'success' while we send the others to 'failure'.
final long offset = lastMessageOffset.get();
if ( offset == 0L ) {
if (offset == 0L) {
// all of the messages failed to send. Route FlowFile to failure
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()});
getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[]{flowFile, pe.getCause()});
session.transfer(flowFile, REL_FAILURE);
} else {
// Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into"
+ " two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[]{
messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause()});
session.transfer(successfulMessages, REL_SUCCESS);
session.transfer(failedMessages, REL_FAILURE);
@ -402,7 +407,7 @@ public class PutKafka extends AbstractProcessor {
session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
}
} finally {
if ( error ) {
if (error) {
producer.close();
} else {
returnProducer(producer);

View File

@ -37,7 +37,6 @@ import org.mockito.stubbing.Answer;
public class TestGetKafka {
@BeforeClass
public static void configureLogging() {
System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
@ -57,14 +56,13 @@ public class TestGetKafka {
runner.run(20, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
for ( final MockFlowFile flowFile : flowFiles ) {
for (final MockFlowFile flowFile : flowFiles) {
System.out.println(flowFile.getAttributes());
System.out.println(new String(flowFile.toByteArray()));
System.out.println();
}
}
@Test
public void testWithDelimiter() {
final List<String> messages = new ArrayList<>();
@ -109,8 +107,8 @@ public class TestGetKafka {
mff.assertContentEquals("Hello\nGood-bye");
}
private static class TestableProcessor extends GetKafka {
private final byte[] key;
private final Iterator<String> messageItr;

View File

@ -36,13 +36,19 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.util.*;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockFlowFileQueue;
import org.apache.nifi.util.MockProcessSession;
import org.apache.nifi.util.MockProvenanceReporter;
import org.apache.nifi.util.MockSessionFactory;
import org.apache.nifi.util.SharedSessionState;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
public class TestPutKafka {
@Test
@ -75,7 +81,6 @@ public class TestPutKafka {
assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
}
@Test
public void testWithImmediateFailure() {
final TestableProcessor proc = new TestableProcessor(0);
@ -94,7 +99,6 @@ public class TestPutKafka {
mff.assertContentEquals(text);
}
@Test
public void testPartialFailure() {
final TestableProcessor proc = new TestableProcessor(2);
@ -119,7 +123,6 @@ public class TestPutKafka {
failureFF.assertContentEquals("3\n4");
}
@Test
public void testWithEmptyMessages() {
final TestableProcessor proc = new TestableProcessor();
@ -144,7 +147,7 @@ public class TestPutKafka {
}
@Test
public void testProvenanceReporterMessagesCount(){
public void testProvenanceReporterMessagesCount() {
final TestableProcessor processor = new TestableProcessor();
ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter());
@ -157,7 +160,6 @@ public class TestPutKafka {
MockProcessSession mockProcessSession = new MockProcessSession(sharedState);
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
final TestRunner runner = TestRunners.newTestRunner(processor);
Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
@ -176,7 +178,7 @@ public class TestPutKafka {
}
@Test
public void testProvenanceReporterWithoutDelimiterMessagesCount(){
public void testProvenanceReporterWithoutDelimiterMessagesCount() {
final TestableProcessor processor = new TestableProcessor();
ProvenanceReporter spyProvenanceReporter = Mockito.spy(new MockProvenanceReporter());
@ -189,7 +191,6 @@ public class TestPutKafka {
MockProcessSession mockProcessSession = new MockProcessSession(sharedState);
Mockito.when(sessionFactory.createSession()).thenReturn(mockProcessSession);
final TestRunner runner = TestRunners.newTestRunner(processor);
Whitebox.setInternalState(runner, "flowFileQueue", flowFileQueue);
Whitebox.setInternalState(runner, "sessionFactory", sessionFactory);
@ -235,8 +236,8 @@ public class TestPutKafka {
assertTrue(Arrays.equals(data, mff.toByteArray()));
}
private static class TestableProcessor extends PutKafka {
private MockProducer producer;
private int failAfter = Integer.MAX_VALUE;
@ -263,8 +264,8 @@ public class TestPutKafka {
}
}
private static class MockProducer extends Producer<byte[], byte[]> {
private int sendCount = 0;
private int failAfter = Integer.MAX_VALUE;
@ -276,7 +277,7 @@ public class TestPutKafka {
@Override
public void send(final KeyedMessage<byte[], byte[]> message) {
if ( ++sendCount > failAfter ) {
if (++sendCount > failAfter) {
throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
} else {
messages.add(message.message());
@ -289,7 +290,7 @@ public class TestPutKafka {
@Override
public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
for ( final KeyedMessage<byte[], byte[]> msg : messages ) {
for (final KeyedMessage<byte[], byte[]> msg : messages) {
send(msg);
}
}