diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index 5949bf05b0..2ed2db9aea 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -21,8 +21,10 @@ import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -161,6 +163,7 @@ public class ConsumeKafka extends AbstractKafkaProcessor kafkaAttributes = new HashMap<>(); final Iterator> iter = consumedRecords.iterator(); while (iter.hasNext()){ @@ -168,12 +171,22 @@ public class ConsumeKafka extends AbstractKafkaProcessor consumedRecord = iter.next(); + + kafkaAttributes.put("kafka.offset", String.valueOf(consumedRecord.offset())); + if (consumedRecord.key() != null) { + kafkaAttributes.put("kafka.key", new String(consumedRecord.key(), StandardCharsets.UTF_8)); + } + kafkaAttributes.put("kafka.partition", String.valueOf(consumedRecord.partition())); + kafkaAttributes.put("kafka.topic", consumedRecord.topic()); + if (messageCounter.getAndIncrement() > 0 && ConsumeKafka.this.demarcatorBytes != null) { out.write(ConsumeKafka.this.demarcatorBytes); } out.write(consumedRecord.value()); } }); + + flowFile = processSession.putAllAttributes(flowFile, kafkaAttributes); /* * Release FlowFile if there are more messages in the * ConsumerRecords batch and no demarcator was provided, diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 2031e76143..374a91b8bc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -150,6 +150,9 @@ public class ConsumeKafkaTest { assertEquals(2, flowFiles.size()); MockFlowFile flowFile = flowFiles.get(0); String[] events = new String(flowFile.toByteArray(), StandardCharsets.UTF_8).split("blah"); + assertEquals("0", flowFile.getAttribute("kafka.partition")); + assertEquals("0", flowFile.getAttribute("kafka.offset")); + assertEquals("validateGetAllMessagesWithProvidedDemarcator", flowFile.getAttribute("kafka.topic")); assertEquals(2, events.length);