From 5cad5838c5b069e21596404a8f4d80c1e51a2e2b Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Thu, 20 Jul 2023 15:57:26 -0400 Subject: [PATCH] NIFI-11840 Add Kafka Partition and Topic Attributes before handoff to RecordReader This closes #7510 Signed-off-by: David Handermann --- .../kafka/pubsub/ConsumerLease.java | 7 ++- .../kafka/pubsub/TestConsumeKafkaMock.java | 47 +++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 644ca07048..698ecc36a5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -545,8 +545,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final Map attributes = getAttributes(consumerRecord); attributes.put(KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); attributes.put(KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp())); - attributes.put(KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); - attributes.put(KAFKA_TOPIC, consumerRecord.topic()); FlowFile failureFlowFile = session.create(); @@ -572,6 +570,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe protected Map getAttributes(final ConsumerRecord consumerRecord) { final Map attributes = new HashMap<>(); + attributes.put(KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); + attributes.put(KAFKA_TOPIC, consumerRecord.topic()); + if (headerNamePattern == null) { return attributes; } @@ -826,8 +827,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } } - kafkaAttrs.put(KAFKA_PARTITION, String.valueOf(tracker.partition)); - kafkaAttrs.put(KAFKA_TOPIC, tracker.topic); if (tracker.totalRecords > 1) { // Add a record.count attribute to remain consistent with other record-oriented processors. If not // reading/writing records, then use "kafka.count" attribute. diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaMock.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaMock.java index ea43ab5afc..3259b3eac1 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaMock.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaMock.java @@ -33,6 +33,9 @@ import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.record.TimestampType; +import org.apache.nifi.csv.CSVReader; +import org.apache.nifi.csv.CSVRecordSetWriter; +import org.apache.nifi.csv.CSVUtils; import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.kafka.shared.property.OutputStrategy; @@ -94,6 +97,50 @@ public class TestConsumeKafkaMock { */ private static final String TEST_GROUP = "nifi-group-" + TIMESTAMP; + /** + * Kafka topic attribute should be available to RecordReader, so that the 'ValueSeparator' character can be + * correctly configured. + */ + @Test + public void testConsumeRecordDynamicReader() throws Exception { + final String value = "ALPHA;BETA\na1;a2\nb1;b2\n"; + final ConsumerRecord record = new ConsumerRecord<>( + TEST_TOPIC, 0, 0, null, value.getBytes(UTF_8)); + final ConsumerRecords consumerRecords = getConsumerRecords(record); + + final ConsumeKafkaRecord_2_6 processor = new ConsumeKafkaRecord_2_6() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return getConsumerPool(consumerRecords, context, log); + } + }; + final TestRunner runner = TestRunners.newTestRunner(processor); + runner.setValidateExpressionUsage(false); + runner.setProperty(BOOTSTRAP_SERVERS, TEST_BOOTSTRAP_SERVER); + runner.setProperty("topic", TEST_TOPIC); + runner.setProperty("topic_type", "names"); + runner.setProperty(ConsumerConfig.GROUP_ID_CONFIG, TEST_GROUP); + runner.setProperty("auto.offset.reset", "earliest"); + final String readerId = "record-reader"; + final RecordReaderFactory readerService = new CSVReader(); + final String writerId = "record-writer"; + final RecordSetWriterFactory writerService = new CSVRecordSetWriter(); + runner.addControllerService(readerId, readerService); + runner.setProperty(readerService, CSVUtils.VALUE_SEPARATOR, + "${kafka.topic:startsWith('nifi-consume'):ifElse(';', ',')}"); + runner.enableControllerService(readerService); + runner.setProperty(readerId, readerId); + runner.addControllerService(writerId, writerService); + runner.enableControllerService(writerService); + runner.setProperty(writerId, writerId); + runner.run(1); + runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1); + final List flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS); + assertEquals(1, flowFiles.size()); + final MockFlowFile flowFile = flowFiles.iterator().next(); + assertEquals("ALPHA,BETA\na1,a2\nb1,b2\n", flowFile.getContent()); + } + @Test public void testConsumeRecordNullKey() throws JsonProcessingException, InitializationException { final ObjectNode node = mapper.createObjectNode().put("a", 1).put("b", "2");