NIFI-11840 Add Kafka Partition and Topic Attributes before handoff to RecordReader

This closes #7510

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Paul Grey 2023-07-20 15:57:26 -04:00 committed by exceptionfactory
parent c68f5ec9a2
commit 5cad5838c5
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 50 additions and 4 deletions

View File

@ -545,8 +545,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final Map<String, String> attributes = getAttributes(consumerRecord); final Map<String, String> attributes = getAttributes(consumerRecord);
attributes.put(KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); attributes.put(KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp())); attributes.put(KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
attributes.put(KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KAFKA_TOPIC, consumerRecord.topic());
FlowFile failureFlowFile = session.create(); FlowFile failureFlowFile = session.create();
@ -572,6 +570,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
protected Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) { protected Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) {
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KAFKA_TOPIC, consumerRecord.topic());
if (headerNamePattern == null) { if (headerNamePattern == null) {
return attributes; return attributes;
} }
@ -826,8 +827,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} }
} }
kafkaAttrs.put(KAFKA_PARTITION, String.valueOf(tracker.partition));
kafkaAttrs.put(KAFKA_TOPIC, tracker.topic);
if (tracker.totalRecords > 1) { if (tracker.totalRecords > 1) {
// Add a record.count attribute to remain consistent with other record-oriented processors. If not // Add a record.count attribute to remain consistent with other record-oriented processors. If not
// reading/writing records, then use "kafka.count" attribute. // reading/writing records, then use "kafka.count" attribute.

View File

@ -33,6 +33,9 @@ import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.record.TimestampType;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.kafka.shared.property.OutputStrategy; import org.apache.nifi.kafka.shared.property.OutputStrategy;
@ -94,6 +97,50 @@ public class TestConsumeKafkaMock {
*/ */
private static final String TEST_GROUP = "nifi-group-" + TIMESTAMP; private static final String TEST_GROUP = "nifi-group-" + TIMESTAMP;
/**
* Kafka topic attribute should be available to RecordReader, so that the 'ValueSeparator' character can be
* correctly configured.
*/
@Test
public void testConsumeRecordDynamicReader() throws Exception {
final String value = "ALPHA;BETA\na1;a2\nb1;b2\n";
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TEST_TOPIC, 0, 0, null, value.getBytes(UTF_8));
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final ConsumeKafkaRecord_2_6 processor = new ConsumeKafkaRecord_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return getConsumerPool(consumerRecords, context, log);
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false);
runner.setProperty(BOOTSTRAP_SERVERS, TEST_BOOTSTRAP_SERVER);
runner.setProperty("topic", TEST_TOPIC);
runner.setProperty("topic_type", "names");
runner.setProperty(ConsumerConfig.GROUP_ID_CONFIG, TEST_GROUP);
runner.setProperty("auto.offset.reset", "earliest");
final String readerId = "record-reader";
final RecordReaderFactory readerService = new CSVReader();
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new CSVRecordSetWriter();
runner.addControllerService(readerId, readerService);
runner.setProperty(readerService, CSVUtils.VALUE_SEPARATOR,
"${kafka.topic:startsWith('nifi-consume'):ifElse(';', ',')}");
runner.enableControllerService(readerService);
runner.setProperty(readerId, readerId);
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(writerId, writerId);
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
assertEquals(1, flowFiles.size());
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals("ALPHA,BETA\na1,a2\nb1,b2\n", flowFile.getContent());
}
@Test @Test
public void testConsumeRecordNullKey() throws JsonProcessingException, InitializationException { public void testConsumeRecordNullKey() throws JsonProcessingException, InitializationException {
final ObjectNode node = mapper.createObjectNode().put("a", 1).put("b", "2"); final ObjectNode node = mapper.createObjectNode().put("a", 1).put("b", "2");