mirror of https://github.com/apache/nifi.git
NIFI-3739: This closes #1728. Pass the proper InputStream to RecordSetWriterFactory in order to obtain RecordSetWriter; also fix error handling so that we don't kill kafka client if unable to create writer, since we roll back the offsets
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
b96e402e78
commit
40de1b18d9
|
@ -171,6 +171,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
|
||||||
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
|
final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
|
||||||
lastPollEmpty = records.count() == 0;
|
lastPollEmpty = records.count() == 0;
|
||||||
processRecords(records);
|
processRecords(records);
|
||||||
|
} catch (final ProcessException pe) {
|
||||||
|
throw pe;
|
||||||
} catch (final Throwable t) {
|
} catch (final Throwable t) {
|
||||||
this.poison();
|
this.poison();
|
||||||
throw t;
|
throw t;
|
||||||
|
@ -405,11 +407,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
|
private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
|
||||||
|
if (records.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
FlowFile flowFile = session.create();
|
FlowFile flowFile = session.create();
|
||||||
try {
|
try {
|
||||||
final RecordSetWriter writer;
|
final RecordSetWriter writer;
|
||||||
try {
|
try {
|
||||||
writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(new byte[0]));
|
writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(records.get(0).value()));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
logger.error(
|
logger.error(
|
||||||
"Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "
|
"Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the "
|
||||||
|
|
Loading…
Reference in New Issue