NIFI-4008: Allow 0 or more records within a message. This closes #1891.

This commit is contained in:
Koji Kawamura 2017-06-05 21:40:53 +09:00 committed by Mark Payne
parent 36b16c9cdd
commit 58e4fb576e
2 changed files with 82 additions and 87 deletions

View File

@ -55,11 +55,11 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordSetWriterFactory;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API. " @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 0.10.x Consumer API. "
+ "The complementary NiFi processor for sending messages is PublishKafka_0_10. Please note that, at this time, the Processor assumes that " + "The complementary NiFi processor for sending messages is PublishKafkaRecord_0_10. Please note that, at this time, the Processor assumes that "
+ "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the " + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the "
+ "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " + "configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the "
+ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. " + "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. "
+ "A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile.") + "A 'record.count' attribute is added to indicate how many records are contained in the FlowFile.")
@Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"}) @Tags({"Kafka", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10.x"})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = "record.count", description = "The number of records received"), @WritesAttribute(attribute = "record.count", description = "The number of records received"),

View File

@ -35,6 +35,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.xml.bind.DatatypeConverter; import javax.xml.bind.DatatypeConverter;
@ -432,60 +433,54 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
bundleMap.put(bundleInfo, tracker); bundleMap.put(bundleInfo, tracker);
} }
private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
}
private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) { private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> messages, final TopicPartition topicPartition) {
RecordSetWriter writer = null;
final BiConsumer<ConsumerRecord<byte[], byte[]>, Exception> handleParseFailure = (consumerRecord, e) -> {
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
// And continue to the next message.
final Map<String, String> attributes = new HashMap<>(); final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset())); attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition())); attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic()); attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
FlowFile failureFlowFile = session.create(); FlowFile failureFlowFile = session.create();
final byte[] value = consumerRecord.value(); failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
if (value != null) {
failureFlowFile = session.write(failureFlowFile, out -> out.write(value));
}
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes); failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic()); final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(failureFlowFile, transitUri); session.getProvenanceReporter().receive(failureFlowFile, transitUri);
session.transfer(failureFlowFile, REL_PARSE_FAILURE); session.transfer(failureFlowFile, REL_PARSE_FAILURE);
logger.error("Failed to parse message from Kafka using the configured Record Reader. "
if (cause == null) { + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
logger.error(message);
} else {
logger.error(message, cause);
}
session.adjustCounter("Parse Failures", 1, false); session.adjustCounter("Parse Failures", 1, false);
} };
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
RecordSetWriter writer = null;
try { try {
for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) { for (final ConsumerRecord<byte[], byte[]> consumerRecord : messages) {
final Record record;
try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) { try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
final RecordReader reader = readerFactory.createRecordReader(Collections.EMPTY_MAP, in, logger);
record = reader.nextRecord(); final RecordReader reader;
final Record firstRecord;
try {
reader = readerFactory.createRecordReader(Collections.emptyMap(), in, logger);
firstRecord = reader.nextRecord();
} catch (final Exception e) { } catch (final Exception e) {
handleParseFailure(consumerRecord, session, e); handleParseFailure.accept(consumerRecord, e);
continue; continue;
} }
if (record == null) { if (firstRecord == null) {
handleParseFailure(consumerRecord, session, null); // If the message doesn't contain any record, do nothing.
continue; continue;
} }
// Determine the bundle for this record. // Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema(); final RecordSchema recordSchema = firstRecord.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema); final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema);
BundleTracker tracker = bundleMap.get(bundleInfo); BundleTracker tracker = bundleMap.get(bundleInfo);
@ -520,17 +515,17 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} }
try { try {
for (Record record = firstRecord; record != null; record = reader.nextRecord()) {
writer.write(record); writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}
tracker.incrementRecordCount(1L); tracker.incrementRecordCount(1L);
session.adjustCounter("Records Received", 1, false);
}
} catch (Exception e) {
// Transfer it to 'parse failure' and continue to the next message.
handleParseFailure.accept(consumerRecord, e);
}
}
} }
session.adjustCounter("Records Received", records.size(), false);
} catch (final Exception e) { } catch (final Exception e) {
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e); logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);