NIFI-2298 This closes #687 added Kafka consume attributes to a FlowFile

This commit is contained in:
Oleg Zhurakousky 2016-07-20 12:18:55 -04:00 committed by joewitt
parent 99b4af7820
commit 0dbba811f3
2 changed files with 16 additions and 0 deletions

View File

@ -21,8 +21,10 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -161,6 +163,7 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]
long start = System.nanoTime(); long start = System.nanoTime();
FlowFile flowFile = processSession.create(); FlowFile flowFile = processSession.create();
final AtomicInteger messageCounter = new AtomicInteger(); final AtomicInteger messageCounter = new AtomicInteger();
final Map<String, String> kafkaAttributes = new HashMap<>();
final Iterator<ConsumerRecord<byte[], byte[]>> iter = consumedRecords.iterator(); final Iterator<ConsumerRecord<byte[], byte[]>> iter = consumedRecords.iterator();
while (iter.hasNext()){ while (iter.hasNext()){
@ -168,12 +171,22 @@ public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]
@Override @Override
public void process(final OutputStream out) throws IOException { public void process(final OutputStream out) throws IOException {
ConsumerRecord<byte[], byte[]> consumedRecord = iter.next(); ConsumerRecord<byte[], byte[]> consumedRecord = iter.next();
kafkaAttributes.put("kafka.offset", String.valueOf(consumedRecord.offset()));
if (consumedRecord.key() != null) {
kafkaAttributes.put("kafka.key", new String(consumedRecord.key(), StandardCharsets.UTF_8));
}
kafkaAttributes.put("kafka.partition", String.valueOf(consumedRecord.partition()));
kafkaAttributes.put("kafka.topic", consumedRecord.topic());
if (messageCounter.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) { if (messageCounter.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) {
out.write(ConsumeKafka.this.demarcatorBytes); out.write(ConsumeKafka.this.demarcatorBytes);
} }
out.write(consumedRecord.value()); out.write(consumedRecord.value());
} }
}); });
flowFile = processSession.putAllAttributes(flowFile, kafkaAttributes);
/* /*
* Release FlowFile if there are more messages in the * Release FlowFile if there are more messages in the
* ConsumerRecords batch and no demarcator was provided, * ConsumerRecords batch and no demarcator was provided,

View File

@ -150,6 +150,9 @@ public class ConsumeKafkaTest {
assertEquals(2, flowFiles.size()); assertEquals(2, flowFiles.size());
MockFlowFile flowFile = flowFiles.get(0); MockFlowFile flowFile = flowFiles.get(0);
String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah");
assertEquals("0", flowFile.getAttribute("kafka.partition"));
assertEquals("0", flowFile.getAttribute("kafka.offset"));
assertEquals("validateGetAllMessagesWithProvidedDemarcator", flowFile.getAttribute("kafka.topic"));
assertEquals(2, events.length); assertEquals(2, events.length);