NIFI-9771: When a Kafka record is obtained during config verification, we should produce an invalid response if the Record Reader is not able to produce any records from it

Signed-off-by: Joe Gresock <jgresock@gmail.com>

This closes #5847.
This commit is contained in:
Mark Payne 2022-03-07 11:54:37 -05:00 committed by Joe Gresock
parent f35f010deb
commit e6229ab938
No known key found for this signature in database
GPG Key ID: 37F5B9B6E258C8B7
1 changed files with 34 additions and 25 deletions

View File

@ -24,9 +24,9 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReader;
@ -403,22 +403,32 @@ public class ConsumerPool implements Closeable {
final Map<String, Integer> recordsPerTopic = new HashMap<>(); final Map<String, Integer> recordsPerTopic = new HashMap<>();
for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) { for (final ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
final Map<String, String> attributes = consumerLease.getAttributes(consumerRecord); final Map<String, String> attributes = consumerLease.getAttributes(consumerRecord);
int numRecords = 0;
final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value(); final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value();
try (final InputStream in = new ByteArrayInputStream(recordBytes)) { try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
final RecordReader reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger); final RecordReader reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
while (reader.nextRecord() != null) { while (reader.nextRecord() != null) {
numRecords++;
} }
} catch (final Exception e) { } catch (final Exception e) {
parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum); parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
latestParseFailureDescription.put(consumerRecord.topic(), e.toString()); latestParseFailureDescription.put(consumerRecord.topic(), e.toString());
} }
if (numRecords == 0) {
parseFailuresPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
latestParseFailureDescription.put(consumerRecord.topic(), "Received Kafka message but Record Reader produced no Record from it");
recordsPerTopic.merge(consumerRecord.topic(), 1, Integer::sum);
} else {
recordsPerTopic.merge(consumerRecord.topic(), numRecords, Integer::sum);
}
} }
// Note here that we do not commit the offsets. We will just let the consumer close without committing the offsets, which // Note here that we do not commit the offsets. We will just let the consumer close without committing the offsets, which
// will roll back the consumption of the messages. // will roll back the consumption of the messages.
if (parseFailuresPerTopic.isEmpty()) {
if (recordsPerTopic.isEmpty()) { if (recordsPerTopic.isEmpty()) {
return new ConfigVerificationResult.Builder() return new ConfigVerificationResult.Builder()
.verificationStepName("Parse Records") .verificationStepName("Parse Records")
@ -427,13 +437,13 @@ public class ConsumerPool implements Closeable {
.build(); .build();
} }
if (parseFailuresPerTopic.isEmpty()) {
return new ConfigVerificationResult.Builder() return new ConfigVerificationResult.Builder()
.verificationStepName("Parse Records") .verificationStepName("Parse Records")
.outcome(Outcome.SUCCESSFUL) .outcome(Outcome.SUCCESSFUL)
.explanation("Was able to parse all Records consumed from topics. Number of Records consumed from each topic: " + recordsPerTopic) .explanation("Was able to parse all Records consumed from topics. Number of Records consumed from each topic: " + recordsPerTopic)
.build(); .build();
} else { }
final Map<String, String> failureDescriptions = new HashMap<>(); final Map<String, String> failureDescriptions = new HashMap<>();
for (final String topic : recordsPerTopic.keySet()) { for (final String topic : recordsPerTopic.keySet()) {
final int records = recordsPerTopic.get(topic); final int records = recordsPerTopic.get(topic);
@ -449,7 +459,6 @@ public class ConsumerPool implements Closeable {
.explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + failureDescriptions) .explanation("With the configured Record Reader, failed to parse at least one Record. Failures per topic: " + failureDescriptions)
.build(); .build();
} }
}
/** /**
* Obtains a consumer from the pool if one is available or lazily * Obtains a consumer from the pool if one is available or lazily