NIFI-4008: This closes #2189. Update ConsumeKafkaRecord 0.11 so that it can consume multiple records from a single Kafka message

NIFI-4008: Ensure that we always check if a Kafka message's value is null before dereferencing it

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-10-02 16:02:54 -04:00 committed by joewitt
parent 1d5df4a5ec
commit 582df7f4e8
2 changed files with 60 additions and 57 deletions

View File

@ -446,7 +446,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic()); attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
FlowFile failureFlowFile = session.create(); FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value())); if (consumerRecord.value() != null) {
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
}
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic()); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
@ -461,7 +463,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
try { try {
for (final ConsumerRecord<byte[], byte[]> consumerRecord : messages) { for (final ConsumerRecord<byte[], byte[]> consumerRecord : messages) {
try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value();
try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
final RecordReader reader; final RecordReader reader;
final Record firstRecord; final Record firstRecord;

View File

@ -512,69 +512,69 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) { for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
final Map<String, String> attributes = getAttributes(consumerRecord); final Map<String, String> attributes = getAttributes(consumerRecord);
final Record record; final byte[] recordBytes = consumerRecord.value() == null ? new byte[0] : consumerRecord.value();
try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { try (final InputStream in = new ByteArrayInputStream(recordBytes)) {
final RecordReader reader = readerFactory.createRecordReader(attributes, in, logger); final RecordReader reader;
record = reader.nextRecord();
} catch (final Exception e) {
handleParseFailure(consumerRecord, session, e);
continue;
}
if (record == null) {
handleParseFailure(consumerRecord, session, null);
continue;
}
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
final OutputStream rawOut = session.write(flowFile);
final RecordSchema writeSchema;
try { try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema); reader = readerFactory.createRecordReader(attributes, in, logger);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e); handleParseFailure(consumerRecord, session, e);
continue;
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
} }
writer = writerFactory.createWriter(logger, writeSchema, rawOut); Record record;
writer.beginRecordSet(); while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer); BundleTracker tracker = bundleMap.get(bundleInfo);
tracker.updateFlowFile(flowFile); if (tracker == null) {
bundleMap.put(bundleInfo, tracker); FlowFile flowFile = session.create();
} else { flowFile = session.putAllAttributes(flowFile, attributes);
writer = tracker.recordWriter;
final OutputStream rawOut = session.write(flowFile);
final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}
try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}
tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", records.size(), false);
}
} }
try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}
tracker.incrementRecordCount(1L);
} }
session.adjustCounter("Records Received", records.size(), false);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);