NIFI-3953: This closes #1837. Allow multiple schemas on same kafka topic/partition for ConsumeKafkaRecord_0_10

Also, updated record writers to ensure that they write the schema as appropriate if not using a RecordSet. Updated ConsumeKafkaRecord to allow for multiple schemas to be on same topic and partition

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Mark Payne 2017-05-22 12:02:18 -04:00 committed by joewitt
parent 6d16fdf170
commit 6937a6cf64
6 changed files with 124 additions and 89 deletions

View File

@ -68,7 +68,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
return recordCount;
}
protected final boolean isRecordSetActive() {
protected final boolean isActiveRecordSet() {
return activeRecordSet;
}
@ -84,7 +84,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
@Override
public final WriteResult finishRecordSet() throws IOException {
if (!isRecordSetActive()) {
if (!isActiveRecordSet()) {
throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun");
}

View File

@ -21,22 +21,18 @@ import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.RE
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.xml.bind.DatatypeConverter;
@ -57,11 +53,10 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.Tuple;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@ -411,12 +406,55 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
return;
}
FlowFile flowFile = session.create();
try {
final RecordSchema schema;
final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
// In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
// We don't want to create a new FlowFile for each record that we receive, so we will just create
// a "temporary flowfile" that will be removed in the finally block below and use that to pass to
// the createRecordReader method.
final FlowFile tempFlowFile = session.create();
try {
schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
while (itr.hasNext()) {
final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
final InputStream in = new ByteArrayInputStream(consumerRecord.value());
final Record record;
try {
final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
record = reader.nextRecord();
} catch (final Exception e) {
// If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(failureFlowFile, transitUri);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
logger.error("Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
session.adjustCounter("Parse Failures", 1, false);
continue;
}
final RecordSchema recordSchema = record.getSchema();
Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema);
if (tuple == null) {
FlowFile flowFile = session.create();
final OutputStream rawOut = session.write(flowFile);
final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
@ -430,58 +468,18 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
throw new ProcessException(e);
}
final FlowFile ff = flowFile;
final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
writer.beginRecordSet();
flowFile = session.write(flowFile, rawOut -> {
final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
final RecordSet recordSet = new RecordSet() {
@Override
public RecordSchema getSchema() throws IOException {
return emptySchema;
tuple = new Tuple<>(flowFile, writer);
writers.put(recordSchema, tuple);
}
@Override
public Record next() throws IOException {
while (itr.hasNext()) {
final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
final InputStream in = new ByteArrayInputStream(consumerRecord.value());
try {
final RecordReader reader = readerFactory.createRecordReader(ff, in, logger);
final Record record = reader.nextRecord();
return record;
final RecordSetWriter writer = tuple.getValue();
writer.write(record);
}
} catch (final Exception e) {
final Map<String, String> attributes = new HashMap<>();
attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
FlowFile failureFlowFile = session.create();
failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
session.transfer(failureFlowFile, REL_PARSE_FAILURE);
logger.error("Failed to parse message from Kafka using the configured Record Reader. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
session.adjustCounter("Parse Failures", 1, false);
}
}
return null;
}
};
try (final OutputStream out = new BufferedOutputStream(rawOut);
final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) {
writeResult.set(writer.write(recordSet));
mimeTypeRef.set(writer.getMimeType());
} catch (final Exception e) {
logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e);
logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
try {
rollback(topicPartition);
@ -489,16 +487,35 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
yield();
throw new ProcessException(e);
} finally {
session.remove(tempFlowFile);
}
for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
FlowFile flowFile = tuple.getKey();
final RecordSetWriter writer = tuple.getValue();
final WriteResult writeResult;
try {
writeResult = writer.finishRecordSet();
writer.close();
} catch (final Exception e) {
logger.error("Failed to finish writing records to Content Repository", e);
try {
rollback(topicPartition);
} catch (final Exception rollbackException) {
logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
throw new ProcessException(e);
}
});
final WriteResult result = writeResult.get();
if (result.getRecordCount() > 0) {
final Map<String, String> attributes = new HashMap<>(result.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
attributes.put("record.count", String.valueOf(result.getRecordCount()));
final int recordCount = writeResult.getRecordCount();
if (recordCount > 0) {
final Map<String, String> attributes = new HashMap<>();
attributes.putAll(writeResult.getAttributes());
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put("record.count", String.valueOf(recordCount));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
@ -509,17 +526,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
session.adjustCounter("Records Received", result.getRecordCount(), false);
session.adjustCounter("Records Received", recordCount, false);
session.transfer(flowFile, REL_SUCCESS);
} else {
session.remove(flowFile);
}
} catch (final Exception e) {
session.remove(flowFile);
throw e;
}
}
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));

View File

@ -112,6 +112,7 @@ public class PublisherLease implements Closeable {
while ((record = recordSet.next()) != null) {
recordCount++;
baos.reset();
writer.write(record);
writer.flush();

View File

@ -50,7 +50,6 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
this.buffered = new BufferedOutputStream(out);
datumWriter = new GenericDatumWriter<>(avroSchema);
schemaAccessWriter.writeHeader(recordSchema, buffered);
encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
}
@ -67,6 +66,13 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
flush();
schemaAccessWriter.writeHeader(recordSchema, getOutputStream());
}
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
datumWriter.write(rec, encoder);
return schemaAccessWriter.getAttributes(recordSchema);

View File

@ -95,6 +95,12 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
schemaWriter.writeHeader(recordSchema, getOutputStream());
}
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));

View File

@ -104,6 +104,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
// If we are not writing an active record set, then we need to ensure that we write the
// schema information.
if (!isActiveRecordSet()) {
generator.flush();
schemaAccess.writeHeader(recordSchema, getOutputStream());
}
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
return schemaAccess.getAttributes(recordSchema);
}