From 30facedc43c5e859f8a82318bb4340d99654eab2 Mon Sep 17 00:00:00 2001
From: greyp9
Date: Wed, 16 Nov 2022 16:07:43 -0500
Subject: [PATCH] NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka
record key (#6131)
* NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
* publisher wrapper record; property naming and display order; doc updates
---
.../nifi-kafka-2-6-processors/pom.xml | 12 +
.../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 58 +-
.../kafka/pubsub/ConsumerLease.java | 208 ++++-
.../processors/kafka/pubsub/ConsumerPool.java | 28 +-
.../kafka/pubsub/KafkaProcessorUtils.java | 12 +
.../kafka/pubsub/OutputStrategy.java | 25 +
.../kafka/pubsub/PublishKafkaRecord_2_6.java | 53 +-
.../kafka/pubsub/PublishKafka_2_6.java | 3 +-
.../kafka/pubsub/PublishMetadataStrategy.java | 23 +
.../kafka/pubsub/PublishStrategy.java | 25 +
.../kafka/pubsub/PublisherLease.java | 226 ++++-
.../kafka/pubsub/PublisherPool.java | 11 +-
.../additionalDetails.html | 127 +++
.../additionalDetails.html | 883 ++++++++++++++++++
.../kafka/pubsub/TestConsumeKafkaMock.java | 517 ++++++++++
.../pubsub/TestConsumeKafkaRecordKey_2_6.java | 103 ++
.../kafka/pubsub/TestPublishKafkaMock.java | 372 ++++++++
.../TestPublishKafkaMockParameterized.java | 289 ++++++
.../pubsub/TestPublishKafkaRecordKey_2_6.java | 102 ++
.../pubsub/TestPublishKafkaRecord_2_6.java | 26 +-
.../kafka/pubsub/TestPublisherLease.java | 4 +-
.../kafka/pubsub/TestPublisherPool.java | 4 +-
.../parameterized/flowfileInput1.json | 8 +
.../parameterized/flowfileInputA.json | 12 +
.../parameterized/kafkaOutput1V.json | 21 +
.../parameterized/kafkaOutput1W.json | 5 +
.../parameterized/kafkaOutputAV.json | 24 +
.../parameterized/kafkaOutputAW.json | 13 +
28 files changed, 3091 insertions(+), 103 deletions(-)
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/OutputStrategy.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishMetadataStrategy.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishStrategy.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaMock.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecordKey_2_6.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMock.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordKey_2_6.java
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInput1.json
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputA.json
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutput1V.json
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutput1W.json
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputAV.json
create mode 100644 nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputAW.json
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
index b98416c3ff..0dde708e9e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/pom.xml
@@ -100,6 +100,18 @@
1.19.0-SNAPSHOT
test
+
+ org.apache.nifi
+ nifi-record-serialization-services
+ 1.19.0-SNAPSHOT
+ test
+
+
+ org.apache.nifi
+ nifi-schema-registry-service-api
+ 1.19.0-SNAPSHOT
+ test
+
commons-io
commons-io
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
index 7f1f032df9..2836e64556 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java
@@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
@@ -37,7 +38,6 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -62,6 +62,11 @@ import java.util.regex.Pattern;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_BYTE_ARRAY;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_RECORD;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_STRING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.OUTPUT_USE_VALUE;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.OUTPUT_USE_WRAPPER;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
@@ -114,7 +119,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
- .displayName("Record Reader")
+ .displayName("Value Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -123,7 +128,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("record-writer")
- .displayName("Record Writer")
+ .displayName("Record Value Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.defaultValue("UTF-8")
.required(false)
.build();
+ static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder()
+ .name("output-strategy")
+ .displayName("Output Strategy")
+ .description("The format used to output the Kafka record into a FlowFile record.")
+ .required(true)
+ .defaultValue(OUTPUT_USE_VALUE.getValue())
+ .allowableValues(OUTPUT_USE_VALUE, OUTPUT_USE_WRAPPER)
+ .build();
+ static final PropertyDescriptor KEY_FORMAT = new PropertyDescriptor.Builder()
+ .name("key-format")
+ .displayName("Key Format")
+ .description("Specifies how to represent the Kafka Record's Key in the output")
+ .required(true)
+ .defaultValue(KEY_AS_BYTE_ARRAY.getValue())
+ .allowableValues(KEY_AS_STRING, KEY_AS_BYTE_ARRAY, KEY_AS_RECORD)
+ .dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_WRAPPER)
+ .build();
+ static final PropertyDescriptor KEY_RECORD_READER = new PropertyDescriptor.Builder()
+ .name("key-record-reader")
+ .displayName("Key Record Reader")
+ .description("The Record Reader to use for parsing the Kafka Record's key into a Record")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .dependsOn(KEY_FORMAT, KEY_AS_RECORD)
+ .build();
static final PropertyDescriptor HEADER_NAME_REGEX = new Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
@@ -224,6 +254,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
+ .dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_VALUE)
.build();
static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
@@ -242,6 +273,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
+ .dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_VALUE)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -268,6 +300,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(GROUP_ID);
+ descriptors.add(OUTPUT_STRATEGY);
+ descriptors.add(HEADER_NAME_REGEX);
+ descriptors.add(KEY_ATTRIBUTE_ENCODING);
+ descriptors.add(KEY_FORMAT);
+ descriptors.add(KEY_RECORD_READER);
descriptors.add(COMMIT_OFFSETS);
descriptors.add(MAX_UNCOMMITTED_TIME);
descriptors.add(HONOR_TRANSACTIONS);
@@ -282,10 +319,8 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(SEPARATE_BY_KEY);
- descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MESSAGE_HEADER_ENCODING);
- descriptors.add(HEADER_NAME_REGEX);
descriptors.add(MAX_POLL_RECORDS);
descriptors.add(COMMS_TIMEOUT);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
@@ -404,8 +439,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
+ final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
+ final String keyFormat = context.getProperty(KEY_FORMAT).getValue();
+ final RecordReaderFactory keyReaderFactory = context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class);
+
final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
- final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
+ final boolean isActiveHeaderNamePattern = (OutputStrategy.USE_VALUE.equals(outputStrategy) && (headerNameRegex != null));
+ final Pattern headerNamePattern = isActiveHeaderNamePattern ? Pattern.compile(headerNameRegex) : null;
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
@@ -426,11 +466,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume, commitOffsets);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
+ commitOffsets, outputStrategy, keyFormat, keyReaderFactory);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
- bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume, commitOffsets);
+ bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
+ commitOffsets, outputStrategy, keyFormat, keyReaderFactory);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 6323735767..d0bd7ee52b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,15 +31,21 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.Tuple;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
@@ -75,6 +82,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
* lease may only belong to a single thread a time.
*/
public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+ private static final RecordField EMPTY_SCHEMA_KEY_RECORD_FIELD =
+ new RecordField("key", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Collections.emptyList())));
private final Long maxWaitMillis;
private final Consumer kafkaConsumer;
@@ -89,6 +98,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private final Pattern headerNamePattern;
private final boolean separateByKey;
private final boolean commitOffsets;
+ private final OutputStrategy outputStrategy;
+ private final String keyFormat;
+ private final RecordReaderFactory keyReaderFactory;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
@@ -111,7 +123,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey,
- final boolean commitMessageOffsets) {
+ final boolean commitMessageOffsets,
+ final OutputStrategy outputStrategy,
+ final String keyFormat,
+ final RecordReaderFactory keyReaderFactory) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
@@ -125,6 +140,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.commitOffsets = commitMessageOffsets;
+ this.outputStrategy = outputStrategy;
+ this.keyFormat = keyFormat;
+ this.keyReaderFactory = keyReaderFactory;
}
/**
@@ -378,9 +396,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} else if (readerFactory != null && writerFactory != null) {
writeRecordData(getProcessSession(), messages, partition);
} else {
- messages.forEach(message -> {
- writeData(getProcessSession(), message, partition);
- });
+ messages.forEach(message -> writeData(getProcessSession(), message, partition));
}
totalMessages += messages.size();
@@ -447,9 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
tracker.incrementRecordCount(1, record.offset(), record.leaderEpoch().orElse(null));
final byte[] value = record.value();
if (value != null) {
- flowFile = session.write(flowFile, out -> {
- out.write(value);
- });
+ flowFile = session.write(flowFile, out -> out.write(value));
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);
@@ -580,7 +594,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
try {
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException e) {
- yield();
+ this.yield();
rollback(topicPartition);
handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
closeWriter(writer);
@@ -591,55 +605,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
try {
+ int recordCount = 0;
Record record;
while ((record = reader.nextRecord()) != null) {
- // Determine the bundle for this record.
- final RecordSchema recordSchema = record.getSchema();
- final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
-
- BundleTracker tracker = bundleMap.get(bundleInfo);
- if (tracker == null) {
- FlowFile flowFile = session.create();
- flowFile = session.putAllAttributes(flowFile, attributes);
-
- final OutputStream rawOut = session.write(flowFile);
-
- final RecordSchema writeSchema;
- try {
- writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
- } catch (final Exception e) {
- logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
-
- rollback(topicPartition);
- yield();
-
- throw new ProcessException(e);
- }
-
- writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
- writer.beginRecordSet();
-
- tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
- tracker.updateFlowFile(flowFile);
- bundleMap.put(bundleInfo, tracker);
- } else {
- writer = tracker.recordWriter;
+ if (OutputStrategy.USE_WRAPPER.equals(outputStrategy)) {
+ record = toWrapperRecord(consumerRecord, record);
}
-
- try {
- writer.write(record);
- } catch (final RuntimeException re) {
- handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
- + "Will route message as its own FlowFile to the 'parse.failure' relationship");
- continue;
- }
-
- tracker.incrementRecordCount(1L, consumerRecord.offset(), consumerRecord.leaderEpoch().orElse(null));
- session.adjustCounter("Records Received", 1L, false);
+ writer = writeRecord(session, consumerRecord, topicPartition, record, attributes);
+ ++recordCount;
+ }
+ if ((recordCount == 0) && (OutputStrategy.USE_WRAPPER.equals(outputStrategy))) {
+ // special processing of wrapper record with null value
+ writer = writeRecord(session, consumerRecord, topicPartition, toWrapperRecord(consumerRecord, null), attributes);
}
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
handleParseFailure(consumerRecord, session, e);
- continue;
}
}
}
@@ -653,6 +633,132 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
}
+ private RecordSetWriter writeRecord(final ProcessSession session, final ConsumerRecord consumerRecord, final TopicPartition topicPartition,
+ final Record record, final Map attributes) throws SchemaNotFoundException, IOException {
+ // Determine the bundle for this record.
+ final RecordSchema recordSchema = record.getSchema();
+ final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
+
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+ final RecordSetWriter writer;
+ if (tracker == null) {
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ final OutputStream rawOut = session.write(flowFile);
+
+ final RecordSchema writeSchema;
+ try {
+ writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+ rollback(topicPartition);
+ this.yield();
+
+ throw new ProcessException(e);
+ }
+
+ writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
+ writer.beginRecordSet();
+
+ tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ } else {
+ writer = tracker.recordWriter;
+ }
+
+ try {
+ writer.write(record);
+ } catch (final RuntimeException re) {
+ handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ return writer;
+ }
+
+ tracker.incrementRecordCount(1L, consumerRecord.offset(), consumerRecord.leaderEpoch().orElse(null));
+ session.adjustCounter("Records Received", 1L, false);
+ return writer;
+ }
+
+ private MapRecord toWrapperRecord(final ConsumerRecord consumerRecord, final Record record)
+ throws IOException, SchemaNotFoundException, MalformedRecordException {
+ final Tuple tupleKey = toWrapperRecordKey(consumerRecord);
+ final Tuple tupleValue = toWrapperRecordValue(record);
+ final Tuple tupleHeaders = toWrapperRecordHeaders(consumerRecord);
+ final Tuple tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+ final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+ tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+ final Map recordValues = new HashMap<>();
+ recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+ recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+ recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+ recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+ return new MapRecord(rootRecordSchema, recordValues);
+ }
+
+ private Tuple toWrapperRecordKey(final ConsumerRecord consumerRecord)
+ throws IOException, SchemaNotFoundException, MalformedRecordException {
+
+ final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+ if (KafkaProcessorUtils.KEY_AS_RECORD.getValue().equals(keyFormat)) {
+ if (key.length == 0) {
+ return new Tuple<>(EMPTY_SCHEMA_KEY_RECORD_FIELD, null);
+ }
+
+ final Map attributes = getAttributes(consumerRecord);
+ try (final InputStream is = new ByteArrayInputStream(key);
+ final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger)) {
+
+ final Record record = reader.nextRecord();
+ final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ return new Tuple<>(recordField, record);
+ }
+ } else if (KafkaProcessorUtils.KEY_AS_STRING.getValue().equals(keyFormat)) {
+ final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+ final String keyString = ((key == null) ? null : new String(key, StandardCharsets.UTF_8));
+ return new Tuple<>(recordField, keyString);
+ } else {
+ final RecordField recordField = new RecordField("key", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+ return new Tuple<>(recordField, ArrayUtils.toObject(key));
+ }
+ }
+
+ private Tuple toWrapperRecordValue(final Record record) {
+ final RecordSchema recordSchema = (record == null) ? null : record.getSchema();
+ final RecordField recordField = new RecordField("value", RecordFieldType.RECORD.getRecordDataType(recordSchema));
+ return new Tuple<>(recordField, record);
+ }
+
+ private Tuple toWrapperRecordHeaders(final ConsumerRecord consumerRecord) {
+ final RecordField recordField = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+ final Map headers = new HashMap<>();
+ for (final Header header : consumerRecord.headers()) {
+ headers.put(header.key(), new String(header.value(), headerCharacterSet));
+ }
+ return new Tuple<>(recordField, headers);
+ }
+
+ private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
+ private static final RecordField FIELD_OFFSET = new RecordField("offset", RecordFieldType.LONG.getDataType());
+ private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
+ private static final RecordSchema SCHEMA_WRAPPER = new SimpleRecordSchema(Arrays.asList(
+ FIELD_TOPIC, FIELD_PARTITION, FIELD_OFFSET, FIELD_TIMESTAMP));
+
+ private Tuple toWrapperRecordMetadata(final ConsumerRecord consumerRecord) {
+ final RecordField recordField = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_WRAPPER));
+ final Map metadata = new HashMap<>();
+ metadata.put("topic", consumerRecord.topic());
+ metadata.put("partition", consumerRecord.partition());
+ metadata.put("offset", consumerRecord.offset());
+ metadata.put("timestamp", consumerRecord.timestamp());
+ final Record record = new MapRecord(SCHEMA_WRAPPER, metadata);
+ return new Tuple<>(recordField, record);
+ }
+
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index e8e031137c..2998f7f564 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -76,6 +76,9 @@ public class ConsumerPool implements Closeable {
private final boolean separateByKey;
private final int[] partitionsToConsume;
private final boolean commitOffsets;
+ private final OutputStrategy outputStrategy;
+ private final String keyFormat;
+ private final RecordReaderFactory keyReaderFactory;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@@ -135,6 +138,9 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
+ this.outputStrategy = null;
+ this.keyFormat = null;
+ this.keyReaderFactory = null;
enqueueAssignedPartitions(partitionsToConsume);
}
@@ -172,6 +178,9 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
+ this.outputStrategy = null;
+ this.keyFormat = null;
+ this.keyReaderFactory = null;
enqueueAssignedPartitions(partitionsToConsume);
}
@@ -191,7 +200,10 @@ public class ConsumerPool implements Closeable {
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume,
- final boolean commitOffsets) {
+ final boolean commitOffsets,
+ final OutputStrategy outputStrategy,
+ final String keyFormat,
+ final RecordReaderFactory keyReaderFactory) {
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -210,6 +222,9 @@ public class ConsumerPool implements Closeable {
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
+ this.outputStrategy = outputStrategy;
+ this.keyFormat = keyFormat;
+ this.keyReaderFactory = keyReaderFactory;
enqueueAssignedPartitions(partitionsToConsume);
}
@@ -229,7 +244,10 @@ public class ConsumerPool implements Closeable {
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume,
- final boolean commitOffsets) {
+ final boolean commitOffsets,
+ final OutputStrategy outputStrategy,
+ final String keyFormat,
+ final RecordReaderFactory keyReaderFactory) {
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@@ -248,6 +266,9 @@ public class ConsumerPool implements Closeable {
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
+ this.outputStrategy = outputStrategy;
+ this.keyFormat = keyFormat;
+ this.keyReaderFactory = keyReaderFactory;
enqueueAssignedPartitions(partitionsToConsume);
}
@@ -619,7 +640,8 @@ public class ConsumerPool implements Closeable {
private SimpleConsumerLease(final Consumer consumer, final List assignedPartitions) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
- readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey, commitOffsets);
+ readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey,
+ commitOffsets, outputStrategy, keyFormat, keyReaderFactory);
this.consumer = consumer;
this.assignedPartitions = assignedPartitions;
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index 6dbacd213b..6da26b0bc6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -110,6 +110,18 @@ public final class KafkaProcessorUtils {
"When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
"For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
+ static final AllowableValue PUBLISH_USE_VALUE = new AllowableValue(PublishStrategy.USE_VALUE.name(),
+ "Use Content as Record Value", "Write only the FlowFile content to the Kafka Record value.");
+ static final AllowableValue PUBLISH_USE_WRAPPER = new AllowableValue(PublishStrategy.USE_WRAPPER.name(),
+ "Use Wrapper", "Write the Kafka Record key, value, headers, and metadata into the Kafka Record value. (See processor usage for more information.)");
+ static final AllowableValue OUTPUT_USE_VALUE = new AllowableValue(OutputStrategy.USE_VALUE.name(),
+ "Use Content as Value", "Write only the Kafka Record value to the FlowFile record.");
+ static final AllowableValue OUTPUT_USE_WRAPPER = new AllowableValue(OutputStrategy.USE_WRAPPER.name(),
+ "Use Wrapper", "Write the Kafka Record key, value, headers, and metadata into the FlowFile record. (See processor usage for more information.)");
+ static final AllowableValue KEY_AS_STRING = new AllowableValue("string", "String", "Format the Kafka ConsumerRecord key as a UTF-8 string.");
+ static final AllowableValue KEY_AS_BYTE_ARRAY = new AllowableValue("byte-array", "Byte Array", "Format the Kafka ConsumerRecord key as a byte array.");
+ static final AllowableValue KEY_AS_RECORD = new AllowableValue("record", "Record", "Format the Kafka ConsumerRecord key as a record.");
+
public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.displayName("Kafka Brokers")
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/OutputStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/OutputStrategy.java
new file mode 100644
index 0000000000..de0e9c1c25
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/OutputStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+/**
+ * Enumeration of strategies used by {@link ConsumeKafkaRecord_2_6} to map Kafka records to NiFi FlowFiles.
+ */
+public enum OutputStrategy {
+ USE_VALUE,
+ USE_WRAPPER;
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
index ea0d7b60d4..ab101f18a4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_6.java
@@ -83,6 +83,8 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.PUBLISH_USE_VALUE;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.PUBLISH_USE_WRAPPER;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.6"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.6 Producer API. "
@@ -125,6 +127,11 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
"Interprets the property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
"so all Records in a given FlowFile will go to the same partition.");
+ static final AllowableValue RECORD_METADATA_FROM_RECORD = new AllowableValue("Metadata From Record", "Metadata From Record", "The Kafka Record's Topic and Partition will be determined by " +
+ "looking at the /metadata/topic and /metadata/partition fields of the Record, respectively. If these fields are invalid or not present, the Topic Name and Partition/Partitioner class " +
+ "properties of the processor will be considered.");
+ static final AllowableValue RECORD_METADATA_FROM_PROPERTIES = new AllowableValue("Use Configured Values", "Use Configured Values", "The Kafka Record's Topic will be determined using the 'Topic " +
+ "Name' processor property. The partition will be determined using the 'Partition' and 'Partitioner class' properties.");
static final PropertyDescriptor TOPIC = new Builder()
.name("topic")
@@ -153,12 +160,22 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.required(true)
.build();
+ static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder()
+ .name("publish-strategy")
+ .displayName("Publish Strategy")
+ .description("The format used to publish the incoming FlowFile record to Kafka.")
+ .required(true)
+ .defaultValue(PUBLISH_USE_VALUE.getValue())
+ .allowableValues(PUBLISH_USE_VALUE, PUBLISH_USE_WRAPPER)
+ .build();
+
static final PropertyDescriptor MESSAGE_KEY_FIELD = new Builder()
.name("message-key-field")
.displayName("Message Key Field")
.description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
+ .dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)
.required(false)
.build();
@@ -239,6 +256,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(NONE)
+ .dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)
.required(false)
.build();
static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
@@ -271,6 +289,23 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.defaultValue("UTF-8")
.required(false)
.build();
+ static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder()
+ .name("record-key-writer")
+ .displayName("Record Key Writer")
+ .description("The Record Key Writer to use for outgoing FlowFiles")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_WRAPPER)
+ .build();
+ static final PropertyDescriptor RECORD_METADATA_STRATEGY = new Builder()
+ .name("Record Metadata Strategy")
+ .displayName("Record Metadata Strategy")
+ .description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / " +
+ "Partitioner class properties")
+ .required(true)
+ .allowableValues(RECORD_METADATA_FROM_PROPERTIES, RECORD_METADATA_FROM_RECORD)
+ .defaultValue(RECORD_METADATA_FROM_PROPERTIES.getValue())
+ .dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_WRAPPER)
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -298,6 +333,9 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
properties.add(TRANSACTIONAL_ID_PREFIX);
properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
properties.add(DELIVERY_GUARANTEE);
+ properties.add(PUBLISH_STRATEGY);
+ properties.add(RECORD_KEY_WRITER);
+ properties.add(RECORD_METADATA_STRATEGY);
properties.add(ATTRIBUTE_NAME_REGEX);
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
@@ -413,9 +451,11 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
Supplier transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
+ final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue());
final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
+ final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
final Map kafkaProperties = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
@@ -423,7 +463,8 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
- return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, transactionalIdSupplier, attributeNamePattern, charset);
+ return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis,
+ useTransactions, transactionalIdSupplier, attributeNamePattern, charset, publishStrategy, recordKeyWriterFactory);
}
@OnStopped
@@ -455,6 +496,14 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
+ final PublishMetadataStrategy publishMetadataStrategy;
+ final String recordMetadataStrategy = context.getProperty(RECORD_METADATA_STRATEGY).getValue();
+ if (RECORD_METADATA_FROM_RECORD.getValue().equalsIgnoreCase(recordMetadataStrategy)) {
+ publishMetadataStrategy = PublishMetadataStrategy.USE_RECORD_METADATA;
+ } else {
+ publishMetadataStrategy = PublishMetadataStrategy.USE_CONFIGURED_VALUES;
+ }
+
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
try {
@@ -494,7 +543,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
- lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
+ lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner, publishMetadataStrategy);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
index 985831c2b4..a468f8c475 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_6.java
@@ -401,7 +401,8 @@ public class PublishKafka_2_6 extends AbstractProcessor implements VerifiablePro
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
- return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, transactionalIdSupplier, attributeNamePattern, charset);
+ return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis,
+ useTransactions, transactionalIdSupplier, attributeNamePattern, charset, null, null);
}
@OnStopped
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishMetadataStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishMetadataStrategy.java
new file mode 100644
index 0000000000..f253c79701
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishMetadataStrategy.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+public enum PublishMetadataStrategy {
+ USE_RECORD_METADATA,
+ USE_CONFIGURED_VALUES;
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishStrategy.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishStrategy.java
new file mode 100644
index 0000000000..ec85236327
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishStrategy.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+/**
+ * Enumeration of strategies used by {@link PublishKafkaRecord_2_6} to map NiFi FlowFiles to Kafka records.
+ */
+public enum PublishStrategy {
+ USE_VALUE,
+ USE_WRAPPER;
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index e87c560a82..e983fdf568 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -26,16 +26,23 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
@@ -49,7 +56,9 @@ import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -67,6 +76,8 @@ public class PublisherLease implements Closeable {
private final boolean useTransactions;
private final Pattern attributeNameRegex;
private final Charset headerCharacterSet;
+ private final PublishStrategy publishStrategy;
+ private final RecordSetWriterFactory recordKeyWriterFactory;
private volatile boolean poisoned = false;
private final AtomicLong messagesSent = new AtomicLong(0L);
@@ -75,8 +86,9 @@ public class PublisherLease implements Closeable {
private InFlightMessageTracker tracker;
- public PublisherLease(final Producer producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger,
- final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
+ public PublisherLease(final Producer producer, final int maxMessageSize, final long maxAckWaitMillis,
+ final ComponentLog logger, final boolean useTransactions, final Pattern attributeNameRegex,
+ final Charset headerCharacterSet, final PublishStrategy publishStrategy, final RecordSetWriterFactory recordKeyWriterFactory) {
this.producer = producer;
this.maxMessageSize = maxMessageSize;
this.logger = logger;
@@ -84,6 +96,8 @@ public class PublisherLease implements Closeable {
this.useTransactions = useTransactions;
this.attributeNameRegex = attributeNameRegex;
this.headerCharacterSet = headerCharacterSet;
+ this.publishStrategy = publishStrategy;
+ this.recordKeyWriterFactory = recordKeyWriterFactory;
}
protected void poison() {
@@ -172,7 +186,7 @@ public class PublisherLease implements Closeable {
}
void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
- final String messageKeyField, final String topic, final Function partitioner) throws IOException {
+ final String messageKeyField, final String explicitTopic, final Function partitioner, final PublishMetadataStrategy metadataStrategy) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker(logger);
}
@@ -187,19 +201,54 @@ public class PublisherLease implements Closeable {
recordCount++;
baos.reset();
- Map additionalAttributes = Collections.emptyMap();
- try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
- final WriteResult writeResult = writer.write(record);
- additionalAttributes = writeResult.getAttributes();
- writer.flush();
+ final String topic;
+ final List headers;
+ final byte[] messageContent;
+ final byte[] messageKey;
+ Integer partition;
+ if (PublishStrategy.USE_WRAPPER.equals(publishStrategy)) {
+ headers = toHeadersWrapper(record.getValue("headers"));
+ final Object key = record.getValue("key");
+ final Object value = record.getValue("value");
+ messageContent = toByteArray("value", value, writerFactory, flowFile);
+ messageKey = toByteArray("key", key, recordKeyWriterFactory, flowFile);
+
+ if (metadataStrategy == PublishMetadataStrategy.USE_RECORD_METADATA) {
+ final Object metadataObject = record.getValue("metadata");
+ if (metadataObject instanceof Record) {
+ final Record metadataRecord = (Record) metadataObject;
+ final String recordTopic = metadataRecord.getAsString("topic");
+ topic = recordTopic == null ? explicitTopic : recordTopic;
+
+ try {
+ partition = metadataRecord.getAsInt("partition");
+ } catch (final Exception e) {
+ logger.warn("Encountered invalid partition for record in {}; will use configured partitioner for Record", flowFile);
+ partition = partitioner == null ? null : partitioner.apply(record);
+ }
+ } else {
+ topic = explicitTopic;
+ partition = partitioner == null ? null : partitioner.apply(record);
+ }
+ } else {
+ topic = explicitTopic;
+ partition = partitioner == null ? null : partitioner.apply(record);
+ }
+ } else {
+ final Map additionalAttributes;
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
+ final WriteResult writeResult = writer.write(record);
+ additionalAttributes = writeResult.getAttributes();
+ writer.flush();
+ }
+ headers = toHeaders(flowFile, additionalAttributes);
+ messageContent = baos.toByteArray();
+ messageKey = getMessageKey(flowFile, writerFactory, record.getValue(messageKeyField));
+ topic = explicitTopic;
+ partition = partitioner == null ? null : partitioner.apply(record);
}
- final byte[] messageContent = baos.toByteArray();
- final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
- final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
-
- final Integer partition = partitioner == null ? null : partitioner.apply(record);
- publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker, partition);
+ publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
@@ -212,7 +261,7 @@ public class PublisherLease implements Closeable {
}
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
- } catch (final SchemaNotFoundException snfe) {
+ } catch (final SchemaNotFoundException | MalformedRecordException snfe) {
throw new IOException(snfe);
} catch (final Exception e) {
tracker.fail(flowFile, e);
@@ -221,6 +270,146 @@ public class PublisherLease implements Closeable {
}
}
+ private List toHeadersWrapper(final Object fieldHeaders) {
+ final List headers = new ArrayList<>();
+ if (fieldHeaders instanceof Record) {
+ final Record recordHeaders = (Record) fieldHeaders;
+ for (String fieldName : recordHeaders.getRawFieldNames()) {
+ final String fieldValue = recordHeaders.getAsString(fieldName);
+ headers.add(new RecordHeader(fieldName, fieldValue.getBytes(StandardCharsets.UTF_8)));
+ }
+ }
+ return headers;
+ }
+
+ private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
+ private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
+ private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
+ private static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(Arrays.asList(
+ FIELD_TOPIC, FIELD_PARTITION, FIELD_TIMESTAMP));
+ private static final RecordField FIELD_METADATA = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
+ private static final RecordField FIELD_HEADERS = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+
+ private Record toWrapperRecord(final Record record, final List headers,
+ final String messageKeyField, final String topic) {
+ final Record recordKey = (Record) record.getValue(messageKeyField);
+ final RecordSchema recordSchemaKey = ((recordKey == null) ? null : recordKey.getSchema());
+ final RecordField fieldKey = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(recordSchemaKey));
+ final RecordField fieldValue = new RecordField("value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+ final RecordSchema schemaWrapper = new SimpleRecordSchema(Arrays.asList(FIELD_METADATA, FIELD_HEADERS, fieldKey, fieldValue));
+
+ final Map valuesMetadata = new HashMap<>();
+ valuesMetadata.put("topic", topic);
+ valuesMetadata.put("timestamp", getTimestamp());
+ final Record recordMetadata = new MapRecord(SCHEMA_METADATA, valuesMetadata);
+
+ final Map valuesHeaders = new HashMap<>();
+ for (Header header : headers) {
+ valuesHeaders.put(header.key(), new String(header.value(), headerCharacterSet));
+ }
+
+ final Map valuesWrapper = new HashMap<>();
+ valuesWrapper.put("metadata", recordMetadata);
+ valuesWrapper.put("headers", valuesHeaders);
+ valuesWrapper.put("key", record.getValue(messageKeyField));
+ valuesWrapper.put("value", record);
+ return new MapRecord(schemaWrapper, valuesWrapper);
+ }
+
+ protected long getTimestamp() {
+ return System.currentTimeMillis();
+ }
+
+ private List toHeaders(final FlowFile flowFile, final Map additionalAttributes) {
+ if (attributeNameRegex == null) {
+ return Collections.emptyList();
+ }
+
+ final List headers = new ArrayList<>();
+ for (final Map.Entry entry : flowFile.getAttributes().entrySet()) {
+ if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+ headers.add(new RecordHeader(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)));
+ }
+ }
+
+ for (final Map.Entry entry : additionalAttributes.entrySet()) {
+ if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+ final Object value = entry.getValue();
+ if (value != null) {
+ final String valueString = value.toString();
+ headers.add(new RecordHeader(entry.getKey(), valueString.getBytes(headerCharacterSet)));
+ }
+ }
+ }
+ return headers;
+ }
+
+ private byte[] toByteArray(final String name, final Object object, final RecordSetWriterFactory writerFactory, final FlowFile flowFile)
+ throws IOException, SchemaNotFoundException, MalformedRecordException {
+ if (object == null) {
+ return null;
+ } else if (object instanceof Record) {
+ if (writerFactory == null) {
+ throw new MalformedRecordException("Record has a key that is itself a record, but the 'Record Key Writer' of the processor was not configured. If Records are expected to have a " +
+ "Record as the key, the 'Record Key Writer' property must be set.");
+ }
+
+ final Record record = (Record) object;
+ final RecordSchema schema = record.getSchema();
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
+ writer.write(record);
+ writer.flush();
+ return baos.toByteArray();
+ }
+ } else if (object instanceof Byte[]) {
+ final Byte[] bytesUppercase = (Byte[]) object;
+ final byte[] bytes = new byte[bytesUppercase.length];
+ for (int i = 0; (i < bytesUppercase.length); ++i) {
+ bytes[i] = bytesUppercase[i];
+ }
+ return bytes;
+ } else if (object instanceof String) {
+ final String string = (String) object;
+ return string.getBytes(StandardCharsets.UTF_8);
+ } else {
+ throw new MalformedRecordException(String.format("Couldn't convert %s record data to byte array.", name));
+ }
+ }
+
+ private byte[] getMessageKey(final FlowFile flowFile, final RecordSetWriterFactory writerFactory,
+ final Object keyValue) throws IOException, SchemaNotFoundException {
+ final byte[] messageKey;
+ if (keyValue == null) {
+ messageKey = null;
+ } else if (keyValue instanceof byte[]) {
+ messageKey = (byte[]) keyValue;
+ } else if (keyValue instanceof Byte[]) {
+ // This case exists because in our Record API we currently don't have a BYTES type, we use an Array of type
+ // Byte, which creates a Byte[] instead of a byte[]. We should address this in the future, but we should
+ // account for the log here.
+ final Byte[] bytes = (Byte[]) keyValue;
+ final byte[] bytesPrimitive = new byte[bytes.length];
+ for (int i = 0; i < bytes.length; i++) {
+ bytesPrimitive[i] = bytes[i];
+ }
+ messageKey = bytesPrimitive;
+ } else if (keyValue instanceof Record) {
+ final Record keyRecord = (Record) keyValue;
+ try (final ByteArrayOutputStream os = new ByteArrayOutputStream(1024)) {
+ try (final RecordSetWriter writerKey = writerFactory.createWriter(logger, keyRecord.getSchema(), os, flowFile)) {
+ writerKey.write(keyRecord);
+ writerKey.flush();
+ }
+ messageKey = os.toByteArray();
+ }
+ } else {
+ final String keyString = keyValue.toString();
+ messageKey = keyString.getBytes(StandardCharsets.UTF_8);
+ }
+ return messageKey;
+ }
+
private void addHeaders(final FlowFile flowFile, final Map additionalAttributes, final ProducerRecord, ?> record) {
if (attributeNameRegex == null) {
return;
@@ -241,15 +430,14 @@ public class PublisherLease implements Closeable {
}
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) {
- publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker, partition);
+ publish(flowFile, Collections.emptyList(), messageKey, messageContent, topic, tracker, partition);
}
- protected void publish(final FlowFile flowFile, final Map additionalAttributes, final byte[] messageKey, final byte[] messageContent,
+ protected void publish(final FlowFile flowFile, final List headers, final byte[] messageKey, final byte[] messageContent,
final String topic, final InFlightMessageTracker tracker, final Integer partition) {
final Integer moddedPartition = partition == null ? null : Math.abs(partition) % (producer.partitionsFor(topic).size());
- final ProducerRecord record = new ProducerRecord<>(topic, moddedPartition, messageKey, messageContent);
- addHeaders(flowFile, additionalAttributes, record);
+ final ProducerRecord record = new ProducerRecord<>(topic, moddedPartition, messageKey, messageContent, headers);
producer.send(record, new Callback() {
@Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
index bd3a664324..e152812b21 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.io.Closeable;
import java.nio.charset.Charset;
@@ -41,12 +42,15 @@ public class PublisherPool implements Closeable {
private final boolean useTransactions;
private final Pattern attributeNameRegex;
private final Charset headerCharacterSet;
+ private final PublishStrategy publishStrategy;
+ private final RecordSetWriterFactory recordKeyWriterFactory;
private Supplier transactionalIdSupplier;
private volatile boolean closed = false;
PublisherPool(final Map kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis,
- final boolean useTransactions, final Supplier transactionalIdSupplier, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
+ final boolean useTransactions, final Supplier transactionalIdSupplier, final Pattern attributeNameRegex,
+ final Charset headerCharacterSet, final PublishStrategy publishStrategy, final RecordSetWriterFactory recordKeyWriterFactory) {
this.logger = logger;
this.publisherQueue = new LinkedBlockingQueue<>();
this.kafkaProperties = kafkaProperties;
@@ -55,6 +59,8 @@ public class PublisherPool implements Closeable {
this.useTransactions = useTransactions;
this.attributeNameRegex = attributeNameRegex;
this.headerCharacterSet = headerCharacterSet;
+ this.publishStrategy = publishStrategy;
+ this.recordKeyWriterFactory = recordKeyWriterFactory;
this.transactionalIdSupplier = transactionalIdSupplier;
}
@@ -79,7 +85,8 @@ public class PublisherPool implements Closeable {
}
final Producer producer = new KafkaProducer<>(properties);
- final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, headerCharacterSet) {
+ final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger,
+ useTransactions, attributeNameRegex, headerCharacterSet, publishStrategy, recordKeyWriterFactory) {
private volatile boolean closed = false;
@Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
index 1150e9cbe5..3b1d2d33a3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html
@@ -256,5 +256,132 @@
ssl.client.auth property.
+ Output Strategies
+
+
This processor offers multiple output strategies (configured via processor property 'Output
+ Strategy') for converting Kafka records into FlowFiles.
+
+ - Output Strategy 'Write Value Only' (the default) emits flowfile records containing only the Kafka
+ record value.
+
+ - Output Strategy 'Use Wrapper' (new) emits flowfile records containing the Kafka record key, value,
+ and headers, as well as additional metadata from the Kafka record.
+
+
+
+
+
The record schema that is used when 'Use Wrapper' is active is as follows (in Avro format):
+
+
+{
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [{
+ "name": "key",
+ "type": [{
+ < Schema is determined by the Key Record Reader, or will be "string" or "bytes", depending on the "Key Format" property (see below for more details) >
+ }, "null"]
+ },
+ {
+ "name": "value",
+ "type": [
+ {
+ < Schema is determined by the Value Record Reader >
+ },
+ "null"
+ ]
+ },
+ {
+ "name": "headers",
+ "type": [
+ { "type": "map", "values": "string", "default": {}},
+ "null"]
+ },
+ {
+ "name": "metadata",
+ "type": [
+ {
+ "type": "record",
+ "name": "metadataType",
+ "fields": [
+ { "name": "topic", "type": ["string", "null"] },
+ { "name": "partition", "type": ["int", "null"] },
+ { "name": "offset", "type": ["int", "null"] },
+ { "name": "timestamp", "type": ["long", "null"] }
+ ]
+ },
+ "null"
+ ]
+ }
+ ]
+}
+
+
+
+
If the Output Strategy property is set to 'Use Wrapper', an additional processor configuration property
+ ('Key Format') is activated. This property is used to specify how the Kafka Record's key should be written out to the FlowFile.
+ The possible values for 'Key Format' are as follows:
+
+ - 'Byte Array' supplies the Kafka Record Key as a byte array, exactly as they are received in the Kafka record.
+ - 'String' converts the Kafka Record Key bytes into a string using the UTF-8 character encoding.
+ (Failure to parse the key bytes as UTF-8 will result in the record being routed to the
+ 'parse.failure' relationship.)
+
+ - 'Record' converts the Kafka Record Key bytes into a deserialized NiFi record, using the associated
+ 'Key Record Reader' controller service.
+
+
+
+
+ If the Key Format property is set to 'Record', an additional processor configuration property name 'Key Record Reader' is
+ made available. This property is used to specify the Record Reader to use in order to parse the Kafka Record's key as a Record.
+
+
+
Here is an example of FlowFile content that is emitted by JsonRecordSetWriter
when strategy "Use Wrapper" is active:
+
+
+[
+ {
+ "key": {
+ "name": "Acme",
+ "number": "AC1234"
+ },
+ "value": {
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number": "AC1234"
+ }
+ },
+ "headers": {
+ "attributeA": "valueA",
+ "attributeB": "valueB"
+ },
+ "metadata": {
+ "topic": "accounts",
+ "partition": 0,
+ "offset": 0,
+ "timestamp": 0
+ }
+ }
+]
+
+
+
+
These new processor properties may be used to extend the capabilities of ConsumeKafkaRecord_2_6, by
+ optionally incorporating additional information from the Kafka record (key, headers, metadata) into the
+ outbound flowfile. Additionally, the Kafka records' keys may now be interpreted as records, rather than as a string.
+ This enables additional decision-making by downstream processors in your flow and enables handling of records where
+ the key is complex, such as an Avro record.
+
+
Additionally, the choice of the 'Output Strategy' property affects the related properties
+ 'Headers to Add as Attributes (Regex)' and 'Key Attribute Encoding'. Since Output Strategy 'Use
+ Wrapper' includes headers and keys in the FlowFile content, they are not also added to the FlowFile
+ attributes. These properties are available only when the FlowFile Output Strategy is set to 'Write
+ Value Only'.
+
+