NIFI-4046: If we are unable to parse out any records from a Kafka Mesaage with ConsumeKafkaRecord, then we should route all of the bytes received to 'parse.failure'

NIFI-4046: Addressed issue of Record Writer failing with ConsumeKafkaRecord

This closes #1906.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Mark Payne 2017-06-09 09:02:08 -04:00 committed by Koji Kawamura
parent 58a623dfa2
commit cdc154f7c8
1 changed files with 53 additions and 21 deletions

View File

@ -347,13 +347,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private Collection<FlowFile> getBundles() throws IOException {
final List<FlowFile> flowFiles = new ArrayList<>();
for (final BundleTracker tracker : bundleMap.values()) {
processBundle(tracker);
final boolean includeBundle = processBundle(tracker);
if (includeBundle) {
flowFiles.add(tracker.flowFile);
}
}
return flowFiles;
}
private void processBundle(final BundleTracker bundle) throws IOException {
private boolean processBundle(final BundleTracker bundle) throws IOException {
final RecordSetWriter writer = bundle.recordWriter;
if (writer != null) {
final WriteResult writeResult;
@ -364,6 +366,11 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
writer.close();
}
if (writeResult.getRecordCount() == 0) {
getProcessSession().remove(bundle.flowFile);
return false;
}
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@ -372,6 +379,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
populateAttributes(bundle);
return true;
}
private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
@ -417,6 +425,35 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
bundleMap.put(bundleInfo, tracker);
}
private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
}
private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
session.getProvenanceReporter().receive(failureFlowFile, transitUri);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
if (cause == null) {
logger.error(message);
} else {
logger.error(message, cause);
}
session.adjustCounter("Parse Failures", 1, false);
}
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
@ -433,24 +470,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
record = reader.nextRecord();
} catch (final Exception e) {
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
handleParseFailure(consumerRecord, session, e);
continue;
}
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(failureFlowFile, transitUri);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
logger.error("Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
session.adjustCounter("Parse Failures", 1, false);
if (record == null) {
handleParseFailure(consumerRecord, session, null);
continue;
}
@ -489,7 +514,14 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
writer = tracker.recordWriter;
}
try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}
tracker.incrementRecordCount(1L);
}