NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key (#6131)

* NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key
* publisher wrapper record; property naming and display order; doc updates
This commit is contained in:
greyp9 2022-11-16 16:07:43 -05:00 committed by GitHub
parent 9e3987d04e
commit 30facedc43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 3091 additions and 103 deletions

View File

@ -100,6 +100,18 @@
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
<version>1.19.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -30,6 +30,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
@ -37,7 +38,6 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@ -62,6 +62,11 @@ import java.util.regex.Pattern;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_BYTE_ARRAY;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_RECORD;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_STRING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.OUTPUT_USE_VALUE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.OUTPUT_USE_WRAPPER;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
@CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. "
@ -114,7 +119,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
static final PropertyDescriptor RECORD_READER = new Builder()
.name("record-reader")
.displayName("Record Reader")
.displayName("Value Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -123,7 +128,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
static final PropertyDescriptor RECORD_WRITER = new Builder()
.name("record-writer")
.displayName("Record Writer")
.displayName("Record Value Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
@ -212,6 +217,31 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.defaultValue("UTF-8")
.required(false)
.build();
static final PropertyDescriptor OUTPUT_STRATEGY = new PropertyDescriptor.Builder()
.name("output-strategy")
.displayName("Output Strategy")
.description("The format used to output the Kafka record into a FlowFile record.")
.required(true)
.defaultValue(OUTPUT_USE_VALUE.getValue())
.allowableValues(OUTPUT_USE_VALUE, OUTPUT_USE_WRAPPER)
.build();
static final PropertyDescriptor KEY_FORMAT = new PropertyDescriptor.Builder()
.name("key-format")
.displayName("Key Format")
.description("Specifies how to represent the Kafka Record's Key in the output")
.required(true)
.defaultValue(KEY_AS_BYTE_ARRAY.getValue())
.allowableValues(KEY_AS_STRING, KEY_AS_BYTE_ARRAY, KEY_AS_RECORD)
.dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_WRAPPER)
.build();
static final PropertyDescriptor KEY_RECORD_READER = new PropertyDescriptor.Builder()
.name("key-record-reader")
.displayName("Key Record Reader")
.description("The Record Reader to use for parsing the Kafka Record's key into a Record")
.identifiesControllerService(RecordReaderFactory.class)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.dependsOn(KEY_FORMAT, KEY_AS_RECORD)
.build();
static final PropertyDescriptor HEADER_NAME_REGEX = new Builder()
.name("header-name-regex")
.displayName("Headers to Add as Attributes (Regex)")
@ -224,6 +254,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_VALUE)
.build();
static final PropertyDescriptor SEPARATE_BY_KEY = new Builder()
@ -242,6 +273,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
.dependsOn(OUTPUT_STRATEGY, OUTPUT_USE_VALUE)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
@ -268,6 +300,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
descriptors.add(RECORD_READER);
descriptors.add(RECORD_WRITER);
descriptors.add(GROUP_ID);
descriptors.add(OUTPUT_STRATEGY);
descriptors.add(HEADER_NAME_REGEX);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(KEY_FORMAT);
descriptors.add(KEY_RECORD_READER);
descriptors.add(COMMIT_OFFSETS);
descriptors.add(MAX_UNCOMMITTED_TIME);
descriptors.add(HONOR_TRANSACTIONS);
@ -282,10 +319,8 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
descriptors.add(SEPARATE_BY_KEY);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(MESSAGE_HEADER_ENCODING);
descriptors.add(HEADER_NAME_REGEX);
descriptors.add(MAX_POLL_RECORDS);
descriptors.add(COMMS_TIMEOUT);
DESCRIPTORS = Collections.unmodifiableList(descriptors);
@ -404,8 +439,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty(OUTPUT_STRATEGY).getValue());
final String keyFormat = context.getProperty(KEY_FORMAT).getValue();
final RecordReaderFactory keyReaderFactory = context.getProperty(KEY_RECORD_READER).asControllerService(RecordReaderFactory.class);
final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue();
final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex);
final boolean isActiveHeaderNamePattern = (OutputStrategy.USE_VALUE.equals(outputStrategy) && (headerNameRegex != null));
final Pattern headerNamePattern = isActiveHeaderNamePattern ? Pattern.compile(headerNameRegex) : null;
final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean();
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
@ -426,11 +466,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
}
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume, commitOffsets);
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
commitOffsets, outputStrategy, keyFormat, keyReaderFactory);
} else if (topicType.equals(TOPIC_PATTERN.getValue())) {
final Pattern topicPattern = Pattern.compile(topicListing.trim());
return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume, commitOffsets);
bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding, partitionsToConsume,
commitOffsets, outputStrategy, keyFormat, keyReaderFactory);
} else {
getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
return null;

View File

@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@ -30,15 +31,21 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaValidationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.Tuple;
import javax.xml.bind.DatatypeConverter;
import java.io.ByteArrayInputStream;
@ -75,6 +82,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
* lease may only belong to a single thread a time.
*/
public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
private static final RecordField EMPTY_SCHEMA_KEY_RECORD_FIELD =
new RecordField("key", RecordFieldType.RECORD.getRecordDataType(new SimpleRecordSchema(Collections.emptyList())));
private final Long maxWaitMillis;
private final Consumer<byte[], byte[]> kafkaConsumer;
@ -89,6 +98,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
private final Pattern headerNamePattern;
private final boolean separateByKey;
private final boolean commitOffsets;
private final OutputStrategy outputStrategy;
private final String keyFormat;
private final RecordReaderFactory keyReaderFactory;
private boolean poisoned = false;
//used for tracking demarcated flowfiles to their TopicPartition so we can append
//to them on subsequent poll calls
@ -111,7 +123,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final Charset headerCharacterSet,
final Pattern headerNamePattern,
final boolean separateByKey,
final boolean commitMessageOffsets) {
final boolean commitMessageOffsets,
final OutputStrategy outputStrategy,
final String keyFormat,
final RecordReaderFactory keyReaderFactory) {
this.maxWaitMillis = maxWaitMillis;
this.kafkaConsumer = kafkaConsumer;
this.demarcatorBytes = demarcatorBytes;
@ -125,6 +140,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
this.headerNamePattern = headerNamePattern;
this.separateByKey = separateByKey;
this.commitOffsets = commitMessageOffsets;
this.outputStrategy = outputStrategy;
this.keyFormat = keyFormat;
this.keyReaderFactory = keyReaderFactory;
}
/**
@ -378,9 +396,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
} else if (readerFactory != null && writerFactory != null) {
writeRecordData(getProcessSession(), messages, partition);
} else {
messages.forEach(message -> {
writeData(getProcessSession(), message, partition);
});
messages.forEach(message -> writeData(getProcessSession(), message, partition));
}
totalMessages += messages.size();
@ -447,9 +463,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
tracker.incrementRecordCount(1, record.offset(), record.leaderEpoch().orElse(null));
final byte[] value = record.value();
if (value != null) {
flowFile = session.write(flowFile, out -> {
out.write(value);
});
flowFile = session.write(flowFile, out -> out.write(value));
}
flowFile = session.putAllAttributes(flowFile, getAttributes(record));
tracker.updateFlowFile(flowFile);
@ -580,7 +594,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
try {
reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
} catch (final IOException e) {
yield();
this.yield();
rollback(topicPartition);
handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
closeWriter(writer);
@ -591,55 +605,21 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
try {
int recordCount = 0;
Record record;
while ((record = reader.nextRecord()) != null) {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
BundleTracker tracker = bundleMap.get(bundleInfo);
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
final OutputStream rawOut = session.write(flowFile);
final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
rollback(topicPartition);
yield();
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
if (OutputStrategy.USE_WRAPPER.equals(outputStrategy)) {
record = toWrapperRecord(consumerRecord, record);
}
try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
continue;
}
tracker.incrementRecordCount(1L, consumerRecord.offset(), consumerRecord.leaderEpoch().orElse(null));
session.adjustCounter("Records Received", 1L, false);
writer = writeRecord(session, consumerRecord, topicPartition, record, attributes);
++recordCount;
}
if ((recordCount == 0) && (OutputStrategy.USE_WRAPPER.equals(outputStrategy))) {
// special processing of wrapper record with null value
writer = writeRecord(session, consumerRecord, topicPartition, toWrapperRecord(consumerRecord, null), attributes);
}
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
handleParseFailure(consumerRecord, session, e);
continue;
}
}
}
@ -653,6 +633,132 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
}
}
private RecordSetWriter writeRecord(final ProcessSession session, final ConsumerRecord<byte[], byte[]> consumerRecord, final TopicPartition topicPartition,
final Record record, final Map<String, String> attributes) throws SchemaNotFoundException, IOException {
// Determine the bundle for this record.
final RecordSchema recordSchema = record.getSchema();
final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null);
BundleTracker tracker = bundleMap.get(bundleInfo);
final RecordSetWriter writer;
if (tracker == null) {
FlowFile flowFile = session.create();
flowFile = session.putAllAttributes(flowFile, attributes);
final OutputStream rawOut = session.write(flowFile);
final RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
} catch (final Exception e) {
logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
rollback(topicPartition);
this.yield();
throw new ProcessException(e);
}
writer = writerFactory.createWriter(logger, writeSchema, rawOut, flowFile);
writer.beginRecordSet();
tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
tracker.updateFlowFile(flowFile);
bundleMap.put(bundleInfo, tracker);
} else {
writer = tracker.recordWriter;
}
try {
writer.write(record);
} catch (final RuntimeException re) {
handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ "Will route message as its own FlowFile to the 'parse.failure' relationship");
return writer;
}
tracker.incrementRecordCount(1L, consumerRecord.offset(), consumerRecord.leaderEpoch().orElse(null));
session.adjustCounter("Records Received", 1L, false);
return writer;
}
private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final Tuple<RecordField, Object> tupleKey = toWrapperRecordKey(consumerRecord);
final Tuple<RecordField, Object> tupleValue = toWrapperRecordValue(record);
final Tuple<RecordField, Object> tupleHeaders = toWrapperRecordHeaders(consumerRecord);
final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
final Map<String, Object> recordValues = new HashMap<>();
recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
return new MapRecord(rootRecordSchema, recordValues);
}
private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
throws IOException, SchemaNotFoundException, MalformedRecordException {
final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
if (KafkaProcessorUtils.KEY_AS_RECORD.getValue().equals(keyFormat)) {
if (key.length == 0) {
return new Tuple<>(EMPTY_SCHEMA_KEY_RECORD_FIELD, null);
}
final Map<String, String> attributes = getAttributes(consumerRecord);
try (final InputStream is = new ByteArrayInputStream(key);
final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger)) {
final Record record = reader.nextRecord();
final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
return new Tuple<>(recordField, record);
}
} else if (KafkaProcessorUtils.KEY_AS_STRING.getValue().equals(keyFormat)) {
final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
final String keyString = ((key == null) ? null : new String(key, StandardCharsets.UTF_8));
return new Tuple<>(recordField, keyString);
} else {
final RecordField recordField = new RecordField("key", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
return new Tuple<>(recordField, ArrayUtils.toObject(key));
}
}
private Tuple<RecordField, Object> toWrapperRecordValue(final Record record) {
final RecordSchema recordSchema = (record == null) ? null : record.getSchema();
final RecordField recordField = new RecordField("value", RecordFieldType.RECORD.getRecordDataType(recordSchema));
return new Tuple<>(recordField, record);
}
private Tuple<RecordField, Object> toWrapperRecordHeaders(final ConsumerRecord<byte[], byte[]> consumerRecord) {
final RecordField recordField = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
final Map<String, String> headers = new HashMap<>();
for (final Header header : consumerRecord.headers()) {
headers.put(header.key(), new String(header.value(), headerCharacterSet));
}
return new Tuple<>(recordField, headers);
}
private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
private static final RecordField FIELD_OFFSET = new RecordField("offset", RecordFieldType.LONG.getDataType());
private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
private static final RecordSchema SCHEMA_WRAPPER = new SimpleRecordSchema(Arrays.asList(
FIELD_TOPIC, FIELD_PARTITION, FIELD_OFFSET, FIELD_TIMESTAMP));
private Tuple<RecordField, Object> toWrapperRecordMetadata(final ConsumerRecord<byte[], byte[]> consumerRecord) {
final RecordField recordField = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_WRAPPER));
final Map<String, Object> metadata = new HashMap<>();
metadata.put("topic", consumerRecord.topic());
metadata.put("partition", consumerRecord.partition());
metadata.put("offset", consumerRecord.offset());
metadata.put("timestamp", consumerRecord.timestamp());
final Record record = new MapRecord(SCHEMA_WRAPPER, metadata);
return new Tuple<>(recordField, record);
}
private void closeWriter(final RecordSetWriter writer) {
try {
if (writer != null) {

View File

@ -76,6 +76,9 @@ public class ConsumerPool implements Closeable {
private final boolean separateByKey;
private final int[] partitionsToConsume;
private final boolean commitOffsets;
private final OutputStrategy outputStrategy;
private final String keyFormat;
private final RecordReaderFactory keyReaderFactory;
private final AtomicLong consumerCreatedCountRef = new AtomicLong();
private final AtomicLong consumerClosedCountRef = new AtomicLong();
private final AtomicLong leasesObtainedCountRef = new AtomicLong();
@ -135,6 +138,9 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
this.outputStrategy = null;
this.keyFormat = null;
this.keyReaderFactory = null;
enqueueAssignedPartitions(partitionsToConsume);
}
@ -172,6 +178,9 @@ public class ConsumerPool implements Closeable {
this.separateByKey = separateByKey;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
this.outputStrategy = null;
this.keyFormat = null;
this.keyReaderFactory = null;
enqueueAssignedPartitions(partitionsToConsume);
}
@ -191,7 +200,10 @@ public class ConsumerPool implements Closeable {
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume,
final boolean commitOffsets) {
final boolean commitOffsets,
final OutputStrategy outputStrategy,
final String keyFormat,
final RecordReaderFactory keyReaderFactory) {
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@ -210,6 +222,9 @@ public class ConsumerPool implements Closeable {
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
this.outputStrategy = outputStrategy;
this.keyFormat = keyFormat;
this.keyReaderFactory = keyReaderFactory;
enqueueAssignedPartitions(partitionsToConsume);
}
@ -229,7 +244,10 @@ public class ConsumerPool implements Closeable {
final boolean separateByKey,
final String keyEncoding,
final int[] partitionsToConsume,
final boolean commitOffsets) {
final boolean commitOffsets,
final OutputStrategy outputStrategy,
final String keyFormat,
final RecordReaderFactory keyReaderFactory) {
this.pooledLeases = new LinkedBlockingQueue<>();
this.maxWaitMillis = maxWaitMillis;
this.logger = logger;
@ -248,6 +266,9 @@ public class ConsumerPool implements Closeable {
this.keyEncoding = keyEncoding;
this.partitionsToConsume = partitionsToConsume;
this.commitOffsets = commitOffsets;
this.outputStrategy = outputStrategy;
this.keyFormat = keyFormat;
this.keyReaderFactory = keyReaderFactory;
enqueueAssignedPartitions(partitionsToConsume);
}
@ -619,7 +640,8 @@ public class ConsumerPool implements Closeable {
private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer, final List<TopicPartition> assignedPartitions) {
super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey, commitOffsets);
readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey,
commitOffsets, outputStrategy, keyFormat, keyReaderFactory);
this.consumer = consumer;
this.assignedPartitions = assignedPartitions;
}

View File

@ -110,6 +110,18 @@ public final class KafkaProcessorUtils {
"When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
"For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
static final AllowableValue PUBLISH_USE_VALUE = new AllowableValue(PublishStrategy.USE_VALUE.name(),
"Use Content as Record Value", "Write only the FlowFile content to the Kafka Record value.");
static final AllowableValue PUBLISH_USE_WRAPPER = new AllowableValue(PublishStrategy.USE_WRAPPER.name(),
"Use Wrapper", "Write the Kafka Record key, value, headers, and metadata into the Kafka Record value. (See processor usage for more information.)");
static final AllowableValue OUTPUT_USE_VALUE = new AllowableValue(OutputStrategy.USE_VALUE.name(),
"Use Content as Value", "Write only the Kafka Record value to the FlowFile record.");
static final AllowableValue OUTPUT_USE_WRAPPER = new AllowableValue(OutputStrategy.USE_WRAPPER.name(),
"Use Wrapper", "Write the Kafka Record key, value, headers, and metadata into the FlowFile record. (See processor usage for more information.)");
static final AllowableValue KEY_AS_STRING = new AllowableValue("string", "String", "Format the Kafka ConsumerRecord key as a UTF-8 string.");
static final AllowableValue KEY_AS_BYTE_ARRAY = new AllowableValue("byte-array", "Byte Array", "Format the Kafka ConsumerRecord key as a byte array.");
static final AllowableValue KEY_AS_RECORD = new AllowableValue("record", "Record", "Format the Kafka ConsumerRecord key as a record.");
public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
.name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.displayName("Kafka Brokers")

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
/**
* Enumeration of strategies used by {@link ConsumeKafkaRecord_2_6} to map Kafka records to NiFi FlowFiles.
*/
public enum OutputStrategy {
USE_VALUE,
USE_WRAPPER;
}

View File

@ -83,6 +83,8 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.PUBLISH_USE_VALUE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.PUBLISH_USE_WRAPPER;
@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.6"})
@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.6 Producer API. "
@ -125,6 +127,11 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
"Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
"so all Records in a given FlowFile will go to the same partition.");
static final AllowableValue RECORD_METADATA_FROM_RECORD = new AllowableValue("Metadata From Record", "Metadata From Record", "The Kafka Record's Topic and Partition will be determined by " +
"looking at the /metadata/topic and /metadata/partition fields of the Record, respectively. If these fields are invalid or not present, the Topic Name and Partition/Partitioner class " +
"properties of the processor will be considered.");
static final AllowableValue RECORD_METADATA_FROM_PROPERTIES = new AllowableValue("Use Configured Values", "Use Configured Values", "The Kafka Record's Topic will be determined using the 'Topic " +
"Name' processor property. The partition will be determined using the 'Partition' and 'Partitioner class' properties.");
static final PropertyDescriptor TOPIC = new Builder()
.name("topic")
@ -153,12 +160,22 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.required(true)
.build();
static final PropertyDescriptor PUBLISH_STRATEGY = new PropertyDescriptor.Builder()
.name("publish-strategy")
.displayName("Publish Strategy")
.description("The format used to publish the incoming FlowFile record to Kafka.")
.required(true)
.defaultValue(PUBLISH_USE_VALUE.getValue())
.allowableValues(PUBLISH_USE_VALUE, PUBLISH_USE_WRAPPER)
.build();
static final PropertyDescriptor MESSAGE_KEY_FIELD = new Builder()
.name("message-key-field")
.displayName("Message Key Field")
.description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)
.required(false)
.build();
@ -239,6 +256,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.expressionLanguageSupported(NONE)
.dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_VALUE)
.required(false)
.build();
static final PropertyDescriptor USE_TRANSACTIONS = new Builder()
@ -271,6 +289,23 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
.defaultValue("UTF-8")
.required(false)
.build();
static final PropertyDescriptor RECORD_KEY_WRITER = new PropertyDescriptor.Builder()
.name("record-key-writer")
.displayName("Record Key Writer")
.description("The Record Key Writer to use for outgoing FlowFiles")
.identifiesControllerService(RecordSetWriterFactory.class)
.dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_WRAPPER)
.build();
static final PropertyDescriptor RECORD_METADATA_STRATEGY = new Builder()
.name("Record Metadata Strategy")
.displayName("Record Metadata Strategy")
.description("Specifies whether the Record's metadata (topic and partition) should come from the Record's metadata field or if it should come from the configured Topic Name and Partition / " +
"Partitioner class properties")
.required(true)
.allowableValues(RECORD_METADATA_FROM_PROPERTIES, RECORD_METADATA_FROM_RECORD)
.defaultValue(RECORD_METADATA_FROM_PROPERTIES.getValue())
.dependsOn(PUBLISH_STRATEGY, PUBLISH_USE_WRAPPER)
.build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@ -298,6 +333,9 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
properties.add(TRANSACTIONAL_ID_PREFIX);
properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
properties.add(DELIVERY_GUARANTEE);
properties.add(PUBLISH_STRATEGY);
properties.add(RECORD_KEY_WRITER);
properties.add(RECORD_METADATA_STRATEGY);
properties.add(ATTRIBUTE_NAME_REGEX);
properties.add(MESSAGE_HEADER_ENCODING);
properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
@ -413,9 +451,11 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty(PUBLISH_STRATEGY).getValue());
final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty(RECORD_KEY_WRITER).asControllerService(RecordSetWriterFactory.class);
final Map<String, Object> kafkaProperties = new HashMap<>();
KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
@ -423,7 +463,8 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, transactionalIdSupplier, attributeNamePattern, charset);
return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis,
useTransactions, transactionalIdSupplier, attributeNamePattern, charset, publishStrategy, recordKeyWriterFactory);
}
@OnStopped
@ -455,6 +496,14 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
final PublishMetadataStrategy publishMetadataStrategy;
final String recordMetadataStrategy = context.getProperty(RECORD_METADATA_STRATEGY).getValue();
if (RECORD_METADATA_FROM_RECORD.getValue().equalsIgnoreCase(recordMetadataStrategy)) {
publishMetadataStrategy = PublishMetadataStrategy.USE_RECORD_METADATA;
} else {
publishMetadataStrategy = PublishMetadataStrategy.USE_CONFIGURED_VALUES;
}
final long startTime = System.nanoTime();
try (final PublisherLease lease = pool.obtainPublisher()) {
try {
@ -494,7 +543,7 @@ public class PublishKafkaRecord_2_6 extends AbstractProcessor implements Verifia
final RecordSet recordSet = reader.createRecordSet();
final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner, publishMetadataStrategy);
} catch (final SchemaNotFoundException | MalformedRecordException e) {
throw new ProcessException(e);
}

View File

@ -401,7 +401,8 @@ public class PublishKafka_2_6 extends AbstractProcessor implements VerifiablePro
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, transactionalIdSupplier, attributeNamePattern, charset);
return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis,
useTransactions, transactionalIdSupplier, attributeNamePattern, charset, null, null);
}
@OnStopped

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
public enum PublishMetadataStrategy {
USE_RECORD_METADATA,
USE_CONFIGURED_VALUES;
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
/**
* Enumeration of strategies used by {@link PublishKafkaRecord_2_6} to map NiFi FlowFiles to Kafka records.
*/
public enum PublishStrategy {
USE_VALUE,
USE_WRAPPER;
}

View File

@ -26,16 +26,23 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.StreamUtils;
@ -49,7 +56,9 @@ import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -67,6 +76,8 @@ public class PublisherLease implements Closeable {
private final boolean useTransactions;
private final Pattern attributeNameRegex;
private final Charset headerCharacterSet;
private final PublishStrategy publishStrategy;
private final RecordSetWriterFactory recordKeyWriterFactory;
private volatile boolean poisoned = false;
private final AtomicLong messagesSent = new AtomicLong(0L);
@ -75,8 +86,9 @@ public class PublisherLease implements Closeable {
private InFlightMessageTracker tracker;
public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger,
final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis,
final ComponentLog logger, final boolean useTransactions, final Pattern attributeNameRegex,
final Charset headerCharacterSet, final PublishStrategy publishStrategy, final RecordSetWriterFactory recordKeyWriterFactory) {
this.producer = producer;
this.maxMessageSize = maxMessageSize;
this.logger = logger;
@ -84,6 +96,8 @@ public class PublisherLease implements Closeable {
this.useTransactions = useTransactions;
this.attributeNameRegex = attributeNameRegex;
this.headerCharacterSet = headerCharacterSet;
this.publishStrategy = publishStrategy;
this.recordKeyWriterFactory = recordKeyWriterFactory;
}
protected void poison() {
@ -172,7 +186,7 @@ public class PublisherLease implements Closeable {
}
void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
final String messageKeyField, final String topic, final Function<Record, Integer> partitioner) throws IOException {
final String messageKeyField, final String explicitTopic, final Function<Record, Integer> partitioner, final PublishMetadataStrategy metadataStrategy) throws IOException {
if (tracker == null) {
tracker = new InFlightMessageTracker(logger);
}
@ -187,19 +201,54 @@ public class PublisherLease implements Closeable {
recordCount++;
baos.reset();
Map<String, String> additionalAttributes = Collections.emptyMap();
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();
final String topic;
final List<Header> headers;
final byte[] messageContent;
final byte[] messageKey;
Integer partition;
if (PublishStrategy.USE_WRAPPER.equals(publishStrategy)) {
headers = toHeadersWrapper(record.getValue("headers"));
final Object key = record.getValue("key");
final Object value = record.getValue("value");
messageContent = toByteArray("value", value, writerFactory, flowFile);
messageKey = toByteArray("key", key, recordKeyWriterFactory, flowFile);
if (metadataStrategy == PublishMetadataStrategy.USE_RECORD_METADATA) {
final Object metadataObject = record.getValue("metadata");
if (metadataObject instanceof Record) {
final Record metadataRecord = (Record) metadataObject;
final String recordTopic = metadataRecord.getAsString("topic");
topic = recordTopic == null ? explicitTopic : recordTopic;
try {
partition = metadataRecord.getAsInt("partition");
} catch (final Exception e) {
logger.warn("Encountered invalid partition for record in {}; will use configured partitioner for Record", flowFile);
partition = partitioner == null ? null : partitioner.apply(record);
}
} else {
topic = explicitTopic;
partition = partitioner == null ? null : partitioner.apply(record);
}
} else {
topic = explicitTopic;
partition = partitioner == null ? null : partitioner.apply(record);
}
} else {
final Map<String, String> additionalAttributes;
try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
final WriteResult writeResult = writer.write(record);
additionalAttributes = writeResult.getAttributes();
writer.flush();
}
headers = toHeaders(flowFile, additionalAttributes);
messageContent = baos.toByteArray();
messageKey = getMessageKey(flowFile, writerFactory, record.getValue(messageKeyField));
topic = explicitTopic;
partition = partitioner == null ? null : partitioner.apply(record);
}
final byte[] messageContent = baos.toByteArray();
final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
final Integer partition = partitioner == null ? null : partitioner.apply(record);
publish(flowFile, additionalAttributes, messageKey, messageContent, topic, tracker, partition);
publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition);
if (tracker.isFailed(flowFile)) {
// If we have a failure, don't try to send anything else.
@ -212,7 +261,7 @@ public class PublisherLease implements Closeable {
}
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
} catch (final SchemaNotFoundException snfe) {
} catch (final SchemaNotFoundException | MalformedRecordException snfe) {
throw new IOException(snfe);
} catch (final Exception e) {
tracker.fail(flowFile, e);
@ -221,6 +270,146 @@ public class PublisherLease implements Closeable {
}
}
private List<Header> toHeadersWrapper(final Object fieldHeaders) {
final List<Header> headers = new ArrayList<>();
if (fieldHeaders instanceof Record) {
final Record recordHeaders = (Record) fieldHeaders;
for (String fieldName : recordHeaders.getRawFieldNames()) {
final String fieldValue = recordHeaders.getAsString(fieldName);
headers.add(new RecordHeader(fieldName, fieldValue.getBytes(StandardCharsets.UTF_8)));
}
}
return headers;
}
private static final RecordField FIELD_TOPIC = new RecordField("topic", RecordFieldType.STRING.getDataType());
private static final RecordField FIELD_PARTITION = new RecordField("partition", RecordFieldType.INT.getDataType());
private static final RecordField FIELD_TIMESTAMP = new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType());
private static final RecordSchema SCHEMA_METADATA = new SimpleRecordSchema(Arrays.asList(
FIELD_TOPIC, FIELD_PARTITION, FIELD_TIMESTAMP));
private static final RecordField FIELD_METADATA = new RecordField("metadata", RecordFieldType.RECORD.getRecordDataType(SCHEMA_METADATA));
private static final RecordField FIELD_HEADERS = new RecordField("headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
private Record toWrapperRecord(final Record record, final List<Header> headers,
final String messageKeyField, final String topic) {
final Record recordKey = (Record) record.getValue(messageKeyField);
final RecordSchema recordSchemaKey = ((recordKey == null) ? null : recordKey.getSchema());
final RecordField fieldKey = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(recordSchemaKey));
final RecordField fieldValue = new RecordField("value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
final RecordSchema schemaWrapper = new SimpleRecordSchema(Arrays.asList(FIELD_METADATA, FIELD_HEADERS, fieldKey, fieldValue));
final Map<String, Object> valuesMetadata = new HashMap<>();
valuesMetadata.put("topic", topic);
valuesMetadata.put("timestamp", getTimestamp());
final Record recordMetadata = new MapRecord(SCHEMA_METADATA, valuesMetadata);
final Map<String, Object> valuesHeaders = new HashMap<>();
for (Header header : headers) {
valuesHeaders.put(header.key(), new String(header.value(), headerCharacterSet));
}
final Map<String, Object> valuesWrapper = new HashMap<>();
valuesWrapper.put("metadata", recordMetadata);
valuesWrapper.put("headers", valuesHeaders);
valuesWrapper.put("key", record.getValue(messageKeyField));
valuesWrapper.put("value", record);
return new MapRecord(schemaWrapper, valuesWrapper);
}
protected long getTimestamp() {
return System.currentTimeMillis();
}
private List<Header> toHeaders(final FlowFile flowFile, final Map<String, ?> additionalAttributes) {
if (attributeNameRegex == null) {
return Collections.emptyList();
}
final List<Header> headers = new ArrayList<>();
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
if (attributeNameRegex.matcher(entry.getKey()).matches()) {
headers.add(new RecordHeader(entry.getKey(), entry.getValue().getBytes(headerCharacterSet)));
}
}
for (final Map.Entry<String, ?> entry : additionalAttributes.entrySet()) {
if (attributeNameRegex.matcher(entry.getKey()).matches()) {
final Object value = entry.getValue();
if (value != null) {
final String valueString = value.toString();
headers.add(new RecordHeader(entry.getKey(), valueString.getBytes(headerCharacterSet)));
}
}
}
return headers;
}
private byte[] toByteArray(final String name, final Object object, final RecordSetWriterFactory writerFactory, final FlowFile flowFile)
throws IOException, SchemaNotFoundException, MalformedRecordException {
if (object == null) {
return null;
} else if (object instanceof Record) {
if (writerFactory == null) {
throw new MalformedRecordException("Record has a key that is itself a record, but the 'Record Key Writer' of the processor was not configured. If Records are expected to have a " +
"Record as the key, the 'Record Key Writer' property must be set.");
}
final Record record = (Record) object;
final RecordSchema schema = record.getSchema();
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos, flowFile)) {
writer.write(record);
writer.flush();
return baos.toByteArray();
}
} else if (object instanceof Byte[]) {
final Byte[] bytesUppercase = (Byte[]) object;
final byte[] bytes = new byte[bytesUppercase.length];
for (int i = 0; (i < bytesUppercase.length); ++i) {
bytes[i] = bytesUppercase[i];
}
return bytes;
} else if (object instanceof String) {
final String string = (String) object;
return string.getBytes(StandardCharsets.UTF_8);
} else {
throw new MalformedRecordException(String.format("Couldn't convert %s record data to byte array.", name));
}
}
private byte[] getMessageKey(final FlowFile flowFile, final RecordSetWriterFactory writerFactory,
final Object keyValue) throws IOException, SchemaNotFoundException {
final byte[] messageKey;
if (keyValue == null) {
messageKey = null;
} else if (keyValue instanceof byte[]) {
messageKey = (byte[]) keyValue;
} else if (keyValue instanceof Byte[]) {
// This case exists because in our Record API we currently don't have a BYTES type, we use an Array of type
// Byte, which creates a Byte[] instead of a byte[]. We should address this in the future, but we should
// account for the log here.
final Byte[] bytes = (Byte[]) keyValue;
final byte[] bytesPrimitive = new byte[bytes.length];
for (int i = 0; i < bytes.length; i++) {
bytesPrimitive[i] = bytes[i];
}
messageKey = bytesPrimitive;
} else if (keyValue instanceof Record) {
final Record keyRecord = (Record) keyValue;
try (final ByteArrayOutputStream os = new ByteArrayOutputStream(1024)) {
try (final RecordSetWriter writerKey = writerFactory.createWriter(logger, keyRecord.getSchema(), os, flowFile)) {
writerKey.write(keyRecord);
writerKey.flush();
}
messageKey = os.toByteArray();
}
} else {
final String keyString = keyValue.toString();
messageKey = keyString.getBytes(StandardCharsets.UTF_8);
}
return messageKey;
}
private void addHeaders(final FlowFile flowFile, final Map<String, String> additionalAttributes, final ProducerRecord<?, ?> record) {
if (attributeNameRegex == null) {
return;
@ -241,15 +430,14 @@ public class PublisherLease implements Closeable {
}
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) {
publish(flowFile, Collections.emptyMap(), messageKey, messageContent, topic, tracker, partition);
publish(flowFile, Collections.emptyList(), messageKey, messageContent, topic, tracker, partition);
}
protected void publish(final FlowFile flowFile, final Map<String, String> additionalAttributes, final byte[] messageKey, final byte[] messageContent,
protected void publish(final FlowFile flowFile, final List<Header> headers, final byte[] messageKey, final byte[] messageContent,
final String topic, final InFlightMessageTracker tracker, final Integer partition) {
final Integer moddedPartition = partition == null ? null : Math.abs(partition) % (producer.partitionsFor(topic).size());
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, moddedPartition, messageKey, messageContent);
addHeaders(flowFile, additionalAttributes, record);
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, moddedPartition, messageKey, messageContent, headers);
producer.send(record, new Callback() {
@Override

View File

@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import java.io.Closeable;
import java.nio.charset.Charset;
@ -41,12 +42,15 @@ public class PublisherPool implements Closeable {
private final boolean useTransactions;
private final Pattern attributeNameRegex;
private final Charset headerCharacterSet;
private final PublishStrategy publishStrategy;
private final RecordSetWriterFactory recordKeyWriterFactory;
private Supplier<String> transactionalIdSupplier;
private volatile boolean closed = false;
PublisherPool(final Map<String, Object> kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis,
final boolean useTransactions, final Supplier<String> transactionalIdSupplier, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
final boolean useTransactions, final Supplier<String> transactionalIdSupplier, final Pattern attributeNameRegex,
final Charset headerCharacterSet, final PublishStrategy publishStrategy, final RecordSetWriterFactory recordKeyWriterFactory) {
this.logger = logger;
this.publisherQueue = new LinkedBlockingQueue<>();
this.kafkaProperties = kafkaProperties;
@ -55,6 +59,8 @@ public class PublisherPool implements Closeable {
this.useTransactions = useTransactions;
this.attributeNameRegex = attributeNameRegex;
this.headerCharacterSet = headerCharacterSet;
this.publishStrategy = publishStrategy;
this.recordKeyWriterFactory = recordKeyWriterFactory;
this.transactionalIdSupplier = transactionalIdSupplier;
}
@ -79,7 +85,8 @@ public class PublisherPool implements Closeable {
}
final Producer<byte[], byte[]> producer = new KafkaProducer<>(properties);
final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, headerCharacterSet) {
final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger,
useTransactions, attributeNameRegex, headerCharacterSet, publishStrategy, recordKeyWriterFactory) {
private volatile boolean closed = false;
@Override

View File

@ -256,5 +256,132 @@
ssl.client.auth property.
</p>
<h2>Output Strategies</h2>
<div>
<p>This processor offers multiple output strategies (configured via processor property 'Output
Strategy') for converting Kafka records into FlowFiles.</p>
<ul>
<li>Output Strategy 'Write Value Only' (the default) emits flowfile records containing only the Kafka
record value.
</li>
<li>Output Strategy 'Use Wrapper' (new) emits flowfile records containing the Kafka record key, value,
and headers, as well as additional metadata from the Kafka record.
</li>
</ul>
<p>The record schema that is used when 'Use Wrapper' is active is as follows (in Avro format):</p>
<pre>
<code>
{
"type": "record",
"name": "nifiRecord",
"namespace": "org.apache.nifi",
"fields": [{
"name": "key",
"type": [{
&lt; Schema is determined by the Key Record Reader, or will be "string" or "bytes", depending on the "Key Format" property (see below for more details) &gt;
}, "null"]
},
{
"name": "value",
"type": [
{
&lt; Schema is determined by the Value Record Reader &gt;
},
"null"
]
},
{
"name": "headers",
"type": [
{ "type": "map", "values": "string", "default": {}},
"null"]
},
{
"name": "metadata",
"type": [
{
"type": "record",
"name": "metadataType",
"fields": [
{ "name": "topic", "type": ["string", "null"] },
{ "name": "partition", "type": ["int", "null"] },
{ "name": "offset", "type": ["int", "null"] },
{ "name": "timestamp", "type": ["long", "null"] }
]
},
"null"
]
}
]
}
</code>
</pre>
<p>If the Output Strategy property is set to 'Use Wrapper', an additional processor configuration property
('Key Format') is activated. This property is used to specify how the Kafka Record's key should be written out to the FlowFile.
The possible values for 'Key Format' are as follows:</p>
<ul>
<li>'Byte Array' supplies the Kafka Record Key as a byte array, exactly as they are received in the Kafka record.</li>
<li>'String' converts the Kafka Record Key bytes into a string using the UTF-8 character encoding.
(Failure to parse the key bytes as UTF-8 will result in the record being routed to the
'parse.failure' relationship.)
</li>
<li>'Record' converts the Kafka Record Key bytes into a deserialized NiFi record, using the associated
'Key Record Reader' controller service.
</li>
</ul>
<p>
If the Key Format property is set to 'Record', an additional processor configuration property name 'Key Record Reader' is
made available. This property is used to specify the Record Reader to use in order to parse the Kafka Record's key as a Record.
</p>
<p>Here is an example of FlowFile content that is emitted by <code>JsonRecordSetWriter</code> when strategy "Use Wrapper" is active:</p>
<pre>
<code>
[
{
"key": {
"name": "Acme",
"number": "AC1234"
},
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number": "AC1234"
}
},
"headers": {
"attributeA": "valueA",
"attributeB": "valueB"
},
"metadata": {
"topic": "accounts",
"partition": 0,
"offset": 0,
"timestamp": 0
}
}
]
</code>
</pre>
<p>These new processor properties may be used to extend the capabilities of ConsumeKafkaRecord_2_6, by
optionally incorporating additional information from the Kafka record (key, headers, metadata) into the
outbound flowfile. Additionally, the Kafka records' keys may now be interpreted as records, rather than as a string.
This enables additional decision-making by downstream processors in your flow and enables handling of records where
the key is complex, such as an Avro record.</p>
<p>Additionally, the choice of the 'Output Strategy' property affects the related properties
'Headers to Add as Attributes (Regex)' and 'Key Attribute Encoding'. Since Output Strategy 'Use
Wrapper' includes headers and keys in the FlowFile content, they are not also added to the FlowFile
attributes. These properties are available only when the FlowFile Output Strategy is set to 'Write
Value Only'.</p>
</div>
</body>
</html>

View File

@ -189,5 +189,888 @@
See the SSL section for a description of how to configure the SSL Context Service based on the
ssl.client.auth property.
</p>
<h2>Publish Strategy</h2>
<div>
<p>This processor includes optional properties that control how a Kafka Record's key and headers are determined:</p>
<ul>
<li>'Publish Strategy'</li>
<li>'Record Key Writer'</li>
</ul>
<p>'Publish Strategy' controls the mode used to convert the FlowFile record into a Kafka record.</p>
<ul>
<li>'Use Content as Record Value' (the default) - the content of the FlowFile Record becomes the content of the Kafka record. The Kafka record's key is determined by the 'Message Key
Field' property, and the Kafka record's headers are determined by the 'Attributes to Send as Headers (Regex)' property.</li>
<li>'Use Wrapper' - the content of the FlowFile record is a wrapper consisting of the Kafka record's key, value, headers, and metadata (topic and partition).</li>
</ul>
<p>
If Publish Strategy is set to 'Use Wrapper', two additional processor configuration properties are
made available: 'Record Key Writer' and 'Record Metadata Strategy'.
</p>
<p>
The 'Record Key Writer' property determines the Record Writer that should be used to serialize the Kafka record's key.
This may be used to emit the key as JSON, Avro, XML, or some other data format. If this property is not set, and the NiFi Record indicates that the key itself
is a Record, the FlowFile will be routed to the 'failure' relationship. If this property is not set and the NiFi Record has a Byte Array or a String (encoded in UTF-8 format), the
Kafka record's key will still be set accordingly.
</p>
<p>
The 'Record Metadata Strategy' specifies whether the Kafka Topic and partition should come from the configured 'Topic Name' property and 'Partition' / 'Partitioner class' properties,
or if they should come from the Record's optional <code>metadata</code> field. If the value is set to 'Metadata From Record', the incoming FlowFile record is expected to have a field
named 'metadata'. That field is expected to be a Record with a 'topic' and a 'partition' field. If these fields are missing or invalid, the processor's 'Topic Name' and 'Partition' /
'Partitioner class' properties will still be used.
</p>
<p>
Using the <code>metadata</code> field to convey the topic and partition has two advantages. Firstly, it pairs well with the ConsumeKafkaRecord_* processor, which produces
this same schema. This means that if data is consumed from one topic and pushed to another topic (or Kafka cluster), the data can be easily pinned to the same partition and topic name.
If the data should be pushed to a different topic, it can be easily updated using an UpdateRecord processor, for instance.
</p>
<p>
Additionally, because a single FlowFile can be sent as a single Kafka transaction, this allows sending records to multiple Kafka topics in a single transaction.
</p>
<h3>Examples</h3>
<p>
The below examples illustrate what will be sent to Kafka, given different configurations and FlowFile contents. These examples
all assume that JsonRecordSetWriter and JsonTreeReader will be used for the Record Readers and Writers.
</p>
<h4>Publish Strategy = 'Use Content as Record Value'</h4>
<p>
Given the processor configuration:
</p>
<table border="thin">
<tr>
<th>Processor Property</th>
<th>Configured Value</th>
</tr>
<tr>
<td>Message Key Field</td>
<td>account</td>
</tr>
<tr>
<td>Attributes to Send as Headers (Regex)</td>
<td>attribute.*</td>
</tr>
</table>
<p>
And a FlowFile with the content:
</p>
<pre>
<code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code>
</pre>
<p>
And attributes:
</p>
<table border="thin">
<tr>
<th>Attribute Name</th>
<th>Attribute Value</th>
</tr>
<tr>
<td>attributeA</td>
<td>valueA</td>
</tr>
<tr>
<td>attributeB</td>
<td>valueB</td>
</tr>
<tr>
<td>otherAttribute</td>
<td>otherValue</td>
</tr>
</table>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Record Key</th>
<td><code>{"name":"Acme","number":"AC1234"}</code></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>attributeA</td>
<td>valueA</td>
</tr>
<tr>
<td>attributeB</td>
<td>valueB</td>
</tr>
</table>
</td>
</tr>
</table>
<h4>Publish Strategy = 'Use Wrapper'</h4>
<p>
When the Publish Strategy is configured to 'Use Wrapper', each FlowFile Record is expected to adhere to a specific schema.
The Record must have three fields: <code>key</code>, <code>value</code>, and <code>headers</code>. There is a fourth, optional field named <code>metadata</code>.
The <code>key</code> may be a String, a byte array, or a Record. The <code>value</code> can be any Record. The <code>headers</code>
is a Map where the values are Strings. The <code>metadata</code> field is a Record that has two fields of interest: <code>topic</code> and <code>partition</code>. If these
fields are specified, they will take precedence over the configured 'Topic Name' and 'Partition' and 'Partitioner class' processor properties.
</p>
<h5>Example 1 - Key as String</h5>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"key": "Acme Holdings",
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"accountType": "enterprise",
"test": "true"
}
}</code>
</pre>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Record Key</th>
<td><code>Acme Accounts</code></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>accountType</td>
<td>enterprise</td>
</tr>
<tr>
<td>test</td>
<td>true</td>
</tr>
</table>
</td>
</tr>
</table>
<p>
Note that in this case, the headers and key come directly from the Record, not from FlowFile attributes.
If there is a desire to include some FlowFile attributes in the headers, this should be accomplished by using a Processor
upstream in order to inject those values into the <code>headers</code> field. For example, an UpdateRecord processor could be used
to easily add new fields to the <code>headers</code> Map.
</p>
<h5>Example 2 - Key as Record</h5>
<p>
Additionally, we may choose to use a more complex value for the record key. The key itself may be a record. This is sometimes used to write the record key
either as JSON or as Avro. In this example, we assume that the 'Record Key Writer' property is set to a JsonRecordSetWriter.
</p>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"key": {
"accountName": "Acme Holdings",
"accountHolder": "John Doe",
"accountId": "280182830-A009"
},
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
}
}</code>
</pre>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Record Key</th>
<td><code>{"accountName":"Acme Holdings","accountHolder":"John Doe","accountId":"280182830-A009"}</code></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td></td>
</tr>
</table>
<p>
Note here that the Record Key is JSON, as the 'Record Key Writer' property is configured to write JSON. it could just as easily be Avro.
</p>
<p>
Also note that if the 'Record Key Writer' had not been set, the FlowFile would have been routed to the 'failure' relationship because the key is a Record.
</p>
<p>
Finally, note here that the <code>headers</code> field is missing. This is acceptable and no headers will be added to the Kafka record.
</p>
<h5>Example 3 - Key as Byte Array</h5>
<p>
We can also have a Record whose <code>key</code> field is an array of bytes. In this case, the 'Record Key Writer' property is not used.
</p>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"key": [65, 27, 10, 20, 11, 57, 88, 19, 65],
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"otherField": {
"a": "b"
}
}</code>
</pre>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Record Key</th>
<td><code>0x411b0a140b39581341</code></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td></td>
</tr>
</table>
<p>
In this case, the byte array that is specified for the key is provided to the Kafka Record as a byte array without changes (in the table, it is simply represented as Hex).
</p>
<p>
Finally, note here that the <code>headers</code> field is missing and an extraneous field, <code>otherField</code> is present.
This is acceptable and no headers will be added to the Kafka record. The <code>otherField</code> is simply ignored.
</p>
<h5>Example 4 - No Key</h5>
<p>
We can also have a Record whose <code>key</code> field is null or missing. In this case, the 'Record Key Writer' property is not used.
</p>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"a": "b",
"c": {
"d": "e"
}
}
}</code>
</pre>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Record Key</th>
<td></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>a</td>
<td>b</td>
</tr>
<tr>
<td>c</td>
<td>MapRecord[{d=e}]</td>
</tr>
</table>
</td>
</tr>
</table>
<p>
In this case, the key is not present, so the Kafka record that is produced has no key associated with it.
</p>
<p>
Note also that the <code>headers</code> field has the expected value for the <code>a</code> header but the <code>c</code>
header has an expected value of <code>MapRecord[{d=e}]</code>. This is because the <code>headers</code> field is expected always to be a Map
with String values. By providing a Record for the <code>c</code> element, we have violated the contract. NiFi attempts to compensate for this
by creating a String representation of the Record, even if it is unlikely to be the representation that the user expects.
</p>
<h5>Example 5 - Topic provided in Record</h5>
<p>
If the Metadata field is provided in the FlowFile's Record, it will be used to determine the Topic and the Partition that the Records are written to.
</p>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"a": "b"
},
"metadata": {
"topic": "topic1"
}
}</code>
</pre>
<p>
And considering that the processor properties are configured as:
</p>
<table border="thin">
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>Topic Name</td>
<td>My Topic</td>
</tr>
<tr>
<td>Partition</td>
<td>2</td>
</tr>
<tr>
<td>Record Metadata Strategy</td>
<td>Metadata From Record</td>
</tr>
</table>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Kafka Topic</th>
<td>topic1</td>
</tr>
<tr>
<th>Topic Partition</th>
<td>2</td>
</tr>
<tr>
<th>Record Key</th>
<td></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>a</td>
<td>b</td>
</tr>
</table>
</td>
</tr>
</table>
<p>
Note that the topic name comes directly from the FlowFile record, and the configured topic name ("My Topic") is ignored. However, if either the "metadata" field or its "topic" sub-field
were missing, the configured topic name ("My Topic") would be used.
</p>
<h5>Example 6 - Partition provided in Record</h5>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"a": "b"
},
"metadata": {
"partition": 6
}
}</code>
</pre>
<p>
And considering that the processor properties are configured as:
</p>
<table border="thin">
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>Topic Name</td>
<td>My Topic</td>
</tr>
<tr>
<td>Partition</td>
<td>2</td>
</tr>
<tr>
<td>Record Metadata Strategy</td>
<td>Metadata From Record</td>
</tr>
</table>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Kafka Topic</th>
<td>My Topic</td>
</tr>
<tr>
<th>Topic Partition</th>
<td>6</td>
</tr>
<tr>
<th>Record Key</th>
<td></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>a</td>
<td>b</td>
</tr>
</table>
</td>
</tr>
</table>
<h5>Example 7 - Topic and Partition provided in Record</h5>
<p>
If the Metadata field is provided in the FlowFile's Record, it will be used to determine the Topic and the Partition that the Records are written to.
</p>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"a": "b"
},
"metadata": {
"topic": "topic1",
"partition": 0
}
}</code>
</pre>
<p>
And considering that the processor properties are configured as:
</p>
<table border="thin">
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>Topic Name</td>
<td>My Topic</td>
</tr>
<tr>
<td>Partition</td>
<td>2</td>
</tr>
<tr>
<td>Record Metadata Strategy</td>
<td>Metadata From Record</td>
</tr>
</table>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Kafka Topic</th>
<td>topic1</td>
</tr>
<tr>
<th>Topic Partition</th>
<td>0</td>
</tr>
<tr>
<th>Record Key</th>
<td></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>a</td>
<td>b</td>
</tr>
</table>
</td>
</tr>
</table>
<p>
In this case, both the topic name and the partition are explicitly defined within the incoming Record, and those will be used.
</p>
<h5>Example 8 - Invalid metadata provided in Record</h5>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"a": "b"
},
"metadata": "hello"
}</code>
</pre>
<p>
And considering that the processor properties are configured as:
</p>
<table border="thin">
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>Topic Name</td>
<td>My Topic</td>
</tr>
<tr>
<td>Partition</td>
<td>2</td>
</tr>
<tr>
<td>Record Metadata Strategy</td>
<td>Metadata From Record</td>
</tr>
</table>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Kafka Topic</th>
<td>My Topic</td>
</tr>
<tr>
<th>Topic Partition</th>
<td>2</td>
</tr>
<tr>
<th>Record Key</th>
<td></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>a</td>
<td>b</td>
</tr>
</table>
</td>
</tr>
</table>
<p>
In this case, the "metadata" field in the Record is ignored because it is not itself a Record.
</p>
<h5>Example 9 - Use Configured Values for Metadata</h5>
<p>
Given a FlowFile with the content:
</p>
<pre>
<code>{
"value": {
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number":"AC1234"
}
},
"headers": {
"a": "b"
},
"metadata": {
"topic": "topic1",
"partition": 6
}
}</code>
</pre>
<p>
And considering that the processor properties are configured as:
</p>
<table border="thin">
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>Topic Name</td>
<td>My Topic</td>
</tr>
<tr>
<td>Partition</td>
<td>2</td>
</tr>
<tr>
<td>Record Metadata Strategy</td>
<td>Use Configured Values</td>
</tr>
</table>
<p>
The record that is produced to Kafka will have the following characteristics:
</p>
<table border="thin">
<tr>
<th>Kafka Topic</th>
<td>My Topic</td>
</tr>
<tr>
<th>Topic Partition</th>
<td>2</td>
</tr>
<tr>
<th>Record Key</th>
<td></td>
</tr>
<tr>
<th>Record Value</th>
<td><code>{"address":"1234 First Street","zip":"12345","account":{"name":"Acme","number":"AC1234"}}</code></td>
</tr>
<tr>
<th>Record Headers</th>
<td>
<table border="thin">
<tr>
<td>Header Name</td>
<td>Header Value</td>
</tr>
<tr>
<td>a</td>
<td>b</td>
</tr>
</table>
</td>
</tr>
</table>
<p>
In this case, the "metadata" field specifies both the topic and the partition. However, it is ignored in favor of the processor properties 'Topic' and 'Partition' because
the property 'Record Metadata Strategy' is set to 'Use Configured Values'.
</p>
</div>
</body>
</html>

View File

@ -0,0 +1,517 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayOutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestConsumeKafkaMock {
/**
* JSON serialization helper.
*/
private final ObjectMapper mapper = new ObjectMapper();
/**
* Kafka server endpoint (mock) for test interactions.
*/
private static final String BOOTSTRAP_SERVER = "localhost:59092";
/**
* Ensure fresh data for each test run.
*/
private static final long TIMESTAMP = System.currentTimeMillis();
/**
* The name of the test kafka topic to be created.
*/
private static final String TEST_TOPIC = "nifi-consume-" + TIMESTAMP;
/**
* The name of the test kafka group to use.
*/
private static final String TEST_GROUP = "nifi-group-" + TIMESTAMP;
@Test
public void testConsumeRecordNullKey() throws JsonProcessingException, InitializationException {
final ObjectNode node = mapper.createObjectNode().put("a", 1).put("b", "2");
final String value = mapper.writeValueAsString(node);
final ArrayNode nodeRecordSet = mapper.createArrayNode().add(node);
final String valueRecordSet = mapper.writeValueAsString(nodeRecordSet);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TEST_TOPIC, 0, 0, null, value.getBytes(UTF_8));
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals(valueRecordSet, flowFile.getContent());
assertNull(flowFile.getAttribute("kafka.key"));
assertEquals("0", flowFile.getAttribute("kafka.partition"));
assertEquals(TEST_TOPIC, flowFile.getAttribute("kafka.topic"));
assertEquals(TEST_GROUP, flowFile.getAttribute("kafka.consumer.id"));
}
@Test
public void testConsumeRecordTextKey() throws Exception {
final String key = "a-kafka-record-key";
final ObjectNode node = mapper.createObjectNode().put("c", 3).put("d", "4");
final String value = mapper.writeValueAsString(node);
final ArrayNode nodeRecordSet = mapper.createArrayNode().add(node);
final String valueRecordSet = mapper.writeValueAsString(nodeRecordSet);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TEST_TOPIC, 0, 0, key.getBytes(UTF_8), value.getBytes(UTF_8));
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals(valueRecordSet, flowFile.getContent());
assertEquals(key, flowFile.getAttribute("kafka.key"));
assertEquals("0", flowFile.getAttribute("kafka.partition"));
assertEquals(TEST_TOPIC, flowFile.getAttribute("kafka.topic"));
assertEquals(TEST_GROUP, flowFile.getAttribute("kafka.consumer.id"));
}
@Test
public void testConsumeRecordJsonKeyNoKeyReader() throws Exception {
final ObjectNode nodeKey = mapper.createObjectNode().put("key", true);
final String key = mapper.writeValueAsString(nodeKey);
final ObjectNode node = mapper.createObjectNode().put("e", 5).put("f", "6");
final String value = mapper.writeValueAsString(node);
final ArrayNode nodeRecordSet = mapper.createArrayNode().add(node);
final String valueRecordSet = mapper.writeValueAsString(nodeRecordSet);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TEST_TOPIC, 0, 0, key.getBytes(UTF_8), value.getBytes(UTF_8));
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
assertEquals(valueRecordSet, flowFile.getContent());
assertEquals(key, flowFile.getAttribute("kafka.key"));
assertEquals("0", flowFile.getAttribute("kafka.partition"));
assertEquals(TEST_TOPIC, flowFile.getAttribute("kafka.topic"));
assertEquals(TEST_GROUP, flowFile.getAttribute("kafka.consumer.id"));
}
@Test
public void testConsumeRecordWrapperStrategyKeyFormatDefault() throws Exception {
final ObjectNode nodeToKafkaKey = mapper.createObjectNode().put("key", true);
final String textToKafkaKey = mapper.writeValueAsString(nodeToKafkaKey);
final ObjectNode nodeToKafkaValue = mapper.createObjectNode().put("g", 7).put("h", "8");
final String textToKafkaValue = mapper.writeValueAsString(nodeToKafkaValue);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TEST_TOPIC, 0, 0L,
0L, TimestampType.CREATE_TIME, 0L, textToKafkaKey.length(), textToKafkaValue.length(),
textToKafkaKey.getBytes(UTF_8), textToKafkaValue.getBytes(UTF_8), getKafkaHeaders());
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
final String keyReaderId = "key-record-reader";
final RecordReaderFactory keyReaderService = new JsonTreeReader();
runner.addControllerService(keyReaderId, keyReaderService);
runner.enableControllerService(keyReaderService);
runner.setProperty(keyReaderId, keyReaderId);
runner.setProperty("output-strategy", OutputStrategy.USE_WRAPPER.name());
runner.setProperty("key-format", "byte-array");
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
// consume strategy "use-wrapper" emits ArrayNode due to JsonRecordSetWriter
final JsonNode nodeFlowFile = mapper.readTree(flowFile.getContent());
assertTrue(nodeFlowFile instanceof ArrayNode);
assertEquals(1, nodeFlowFile.size());
// extract the NiFi json object representation of Kafka input record
final JsonNode flowFileValue = nodeFlowFile.iterator().next();
// wrapper object contains "key", "value", "headers", "metadata"
assertEquals(4, flowFileValue.size());
final JsonNode nodeWrapperKey = Objects.requireNonNull(flowFileValue.get("key"));
final JsonNode nodeWrapperValue = Objects.requireNonNull(flowFileValue.get("value"));
final JsonNode nodeWrapperHeaders = Objects.requireNonNull(flowFileValue.get("headers"));
final JsonNode nodeWrapperMetadata = Objects.requireNonNull(flowFileValue.get("metadata"));
// validate headers
assertEquals(2, nodeWrapperHeaders.size());
final JsonNode header1 = nodeWrapperHeaders.get("header1");
assertNotNull(header1);
assertEquals("value1", header1.asText());
assertNotNull(nodeWrapperHeaders.get("header2"));
// validate metadata
assertEquals(4, nodeWrapperMetadata.size());
assertEquals(TEST_TOPIC, nodeWrapperMetadata.get("topic").asText());
assertEquals(0, nodeWrapperMetadata.get("partition").asInt());
assertNotNull(nodeWrapperMetadata.get("offset"));
assertNotNull(nodeWrapperMetadata.get("timestamp"));
// validate value
assertTrue(nodeWrapperValue instanceof ObjectNode);
assertEquals(textToKafkaValue, mapper.writeValueAsString(nodeWrapperValue));
// validate key
assertTrue(nodeWrapperKey instanceof ArrayNode);
ArrayNode arrayNodeKey = (ArrayNode) nodeWrapperKey;
assertEquals(textToKafkaKey.length(), arrayNodeKey.size());
final ByteArrayOutputStream os = new ByteArrayOutputStream();
arrayNodeKey.forEach(b -> os.write(b.asInt()));
assertEquals(textToKafkaKey, new String(os.toByteArray(), UTF_8));
// validate flowfile attributes
assertNull(flowFile.getAttribute("foo"));
assertEquals(TEST_TOPIC, flowFile.getAttribute("kafka.topic"));
assertEquals(TEST_GROUP, flowFile.getAttribute("kafka.consumer.id"));
assertEquals(textToKafkaKey, flowFile.getAttribute("kafka.key"));
}
@Test
public void testConsumeRecordWrapperStrategyKeyFormatDefaultHeaderNonUTF8() throws Exception {
final ObjectNode nodeToKafkaKey = mapper.createObjectNode().put("key", true);
final String textToKafkaKey = mapper.writeValueAsString(nodeToKafkaKey);
final ObjectNode nodeToKafkaValue = mapper.createObjectNode().put("i", 9).put("j", "10");
final String textToKafkaValue = mapper.writeValueAsString(nodeToKafkaValue);
final RecordHeaders headers = new RecordHeaders(Arrays.asList(
new RecordHeader("header1", "nameÄËÖÜ".getBytes(ISO_8859_1)),
new RecordHeader("header2", "value2".getBytes(UTF_8))));
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TEST_TOPIC, 0, 0L,
0L, TimestampType.CREATE_TIME, 0L, textToKafkaKey.length(), textToKafkaValue.length(),
textToKafkaKey.getBytes(UTF_8), textToKafkaValue.getBytes(UTF_8), headers);
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
final String keyReaderId = "key-record-reader";
final RecordReaderFactory keyReaderService = new JsonTreeReader();
runner.addControllerService(keyReaderId, keyReaderService);
runner.enableControllerService(keyReaderService);
runner.setProperty(keyReaderId, keyReaderId);
runner.setProperty("output-strategy", OutputStrategy.USE_WRAPPER.name());
runner.setProperty("key-format", "byte-array");
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
// consume strategy "use-wrapper" emits ArrayNode due to JsonRecordSetWriter
final JsonNode nodeFlowFile = mapper.readTree(flowFile.getContent());
assertTrue(nodeFlowFile instanceof ArrayNode);
assertEquals(1, nodeFlowFile.size());
// extract the NiFi json object representation of Kafka input record
final JsonNode flowFileValue = nodeFlowFile.iterator().next();
// wrapper object contains "key", "value", "headers", "metadata"
assertEquals(4, flowFileValue.size());
final JsonNode nodeWrapperHeaders = Objects.requireNonNull(flowFileValue.get("headers"));
// validate headers
assertEquals(2, nodeWrapperHeaders.size());
final JsonNode header1 = nodeWrapperHeaders.get("header1");
assertNotNull(header1);
final String expected = new String("nameÄËÖÜ".getBytes(ISO_8859_1), UTF_8);
assertEquals(expected, header1.asText());
assertNotNull(nodeWrapperHeaders.get("header2"));
}
@Test
public void testConsumeRecordWrapperStrategyKeyFormatString() throws Exception {
final ObjectNode nodeToKafkaKey = mapper.createObjectNode().put("key", true);
final String textToKafkaKey = mapper.writeValueAsString(nodeToKafkaKey);
final ObjectNode nodeToKafkaValue = mapper.createObjectNode().put("k", 11).put("l", "12");
final String textToKafkaValue = mapper.writeValueAsString(nodeToKafkaValue);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TEST_TOPIC, 0, 0L,
0L, TimestampType.CREATE_TIME, 0L, textToKafkaKey.length(), textToKafkaValue.length(),
textToKafkaKey.getBytes(UTF_8), textToKafkaValue.getBytes(UTF_8), getKafkaHeaders());
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
final String keyReaderId = "key-record-reader";
final RecordReaderFactory keyReaderService = new JsonTreeReader();
runner.addControllerService(keyReaderId, keyReaderService);
runner.enableControllerService(keyReaderService);
runner.setProperty(keyReaderId, keyReaderId);
runner.setProperty("output-strategy", OutputStrategy.USE_WRAPPER.name());
runner.setProperty("key-format", "string");
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
// consume strategy "use-wrapper" emits ArrayNode due to JsonRecordSetWriter
final JsonNode nodeFlowFile = mapper.readTree(flowFile.getContent());
assertTrue(nodeFlowFile instanceof ArrayNode);
assertEquals(1, nodeFlowFile.size());
// extract the NiFi json object representation of Kafka input record
final JsonNode flowFileValue = nodeFlowFile.iterator().next();
// wrapper object contains "key", "value", "headers", "metadata"
assertEquals(4, flowFileValue.size());
final JsonNode nodeWrapperKey = Objects.requireNonNull(flowFileValue.get("key"));
// validate key
assertTrue(nodeWrapperKey instanceof TextNode);
TextNode textNodeKey = (TextNode) nodeWrapperKey;
assertEquals(textToKafkaKey.length(), textNodeKey.asText().length());
assertEquals(textToKafkaKey, textNodeKey.textValue());
// validate flowfile attributes
assertNull(flowFile.getAttribute("foo"));
assertEquals(TEST_TOPIC, flowFile.getAttribute("kafka.topic"));
assertEquals(TEST_GROUP, flowFile.getAttribute("kafka.consumer.id"));
assertEquals(textToKafkaKey, flowFile.getAttribute("kafka.key"));
}
@Test
public void testConsumeRecordWrapperStrategyKeyFormatRecord() throws Exception {
final ObjectNode nodeToKafkaKey = mapper.createObjectNode().put("key", true);
final String textToKafkaKey = mapper.writeValueAsString(nodeToKafkaKey);
final ObjectNode nodeToKafkaValue = mapper.createObjectNode().put("k", 11).put("l", "12");
final String textToKafkaValue = mapper.writeValueAsString(nodeToKafkaValue);
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(TEST_TOPIC, 0, 0L,
0L, TimestampType.CREATE_TIME, 0L, textToKafkaKey.length(), textToKafkaValue.length(),
textToKafkaKey.getBytes(UTF_8), textToKafkaValue.getBytes(UTF_8), getKafkaHeaders());
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
final String keyReaderId = "key-record-reader";
final RecordReaderFactory keyReaderService = new JsonTreeReader();
runner.addControllerService(keyReaderId, keyReaderService);
runner.enableControllerService(keyReaderService);
runner.setProperty(keyReaderId, keyReaderId);
runner.setProperty("output-strategy", OutputStrategy.USE_WRAPPER.name());
runner.setProperty("key-format", "record");
runner.setProperty("key-record-reader", "record-reader");
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
// consume strategy "use-wrapper" emits ArrayNode due to JsonRecordSetWriter
final JsonNode nodeFlowFile = mapper.readTree(flowFile.getContent());
assertTrue(nodeFlowFile instanceof ArrayNode);
assertEquals(1, nodeFlowFile.size());
// extract the NiFi json object representation of Kafka input record
final JsonNode flowFileValue = nodeFlowFile.iterator().next();
// wrapper object contains "key", "value", "headers", "metadata"
assertEquals(4, flowFileValue.size());
final JsonNode nodeWrapperKey = Objects.requireNonNull(flowFileValue.get("key"));
final JsonNode nodeWrapperValue = Objects.requireNonNull(flowFileValue.get("value"));
final JsonNode nodeWrapperHeaders = Objects.requireNonNull(flowFileValue.get("headers"));
final JsonNode nodeWrapperMetadata = Objects.requireNonNull(flowFileValue.get("metadata"));
// validate headers
assertEquals(2, nodeWrapperHeaders.size());
final JsonNode header1 = nodeWrapperHeaders.get("header1");
assertNotNull(header1);
assertEquals("value1", header1.asText());
assertNotNull(nodeWrapperHeaders.get("header2"));
// validate metadata
assertEquals(4, nodeWrapperMetadata.size());
assertEquals(TEST_TOPIC, nodeWrapperMetadata.get("topic").asText());
assertEquals(0, nodeWrapperMetadata.get("partition").asInt());
assertNotNull(nodeWrapperMetadata.get("offset"));
assertNotNull(nodeWrapperMetadata.get("timestamp"));
// validate value
assertTrue(nodeWrapperValue instanceof ObjectNode);
assertEquals(textToKafkaValue, mapper.writeValueAsString(nodeWrapperValue));
// validate key
assertTrue(nodeWrapperKey instanceof ObjectNode);
ObjectNode objectNodeKey = (ObjectNode) nodeWrapperKey;
assertTrue(objectNodeKey.get("key").asBoolean());
// validate flowfile attributes
assertNull(flowFile.getAttribute("foo"));
assertEquals(TEST_TOPIC, flowFile.getAttribute("kafka.topic"));
assertEquals(TEST_GROUP, flowFile.getAttribute("kafka.consumer.id"));
assertEquals("0", flowFile.getAttribute("kafka.partition"));
assertEquals(textToKafkaKey, flowFile.getAttribute("kafka.key"));
}
@Test
public void testConsumeRecordWrapperStrategyStringKeyNullValue() throws InitializationException, JsonProcessingException {
final Headers headers = new RecordHeaders();
headers.add(new RecordHeader("headerA", "valueA".getBytes(UTF_8)));
final String key = "a-string-key";
final ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
TEST_TOPIC, 0, 0, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0L, key.length(), 0, key.getBytes(UTF_8), null, headers);
final ConsumerRecords<byte[], byte[]> consumerRecords = getConsumerRecords(record);
final TestRunner runner = getTestRunner(consumerRecords, TEST_TOPIC, TEST_GROUP);
runner.setProperty("output-strategy", OutputStrategy.USE_WRAPPER.name());
runner.setProperty("key-format", "string");
runner.run(1);
runner.assertAllFlowFilesTransferred(ConsumeKafkaRecord_2_6.REL_SUCCESS, 1);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile = flowFiles.iterator().next();
final JsonNode nodeFlowFiles = mapper.readTree(flowFile.getContent());
final JsonNode nodeFlowFile = nodeFlowFiles.iterator().next();
final JsonNode nodeWrapperKey = Objects.requireNonNull(nodeFlowFile.get("key"));
assertEquals(key, nodeWrapperKey.asText());
assertTrue(nodeFlowFile.get("value").isNull());
final JsonNode nodeWrapperHeaders = Objects.requireNonNull(nodeFlowFile.get("headers"));
assertEquals("valueA", nodeWrapperHeaders.get("headerA").asText());
}
/**
* Construct a test runner that simulates Kafka interactions.
*/
private TestRunner getTestRunner(final ConsumerRecords<byte[], byte[]> consumerRecords,
final String topic, final String group) throws InitializationException {
final ConsumeKafkaRecord_2_6 processor = new ConsumeKafkaRecord_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return getConsumerPool(consumerRecords, context, log);
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, BOOTSTRAP_SERVER);
runner.setProperty("topic", topic);
runner.setProperty("topic_type", "names");
runner.setProperty(ConsumerConfig.GROUP_ID_CONFIG, group);
runner.setProperty("auto.offset.reset", "earliest");
final String readerId = "record-reader";
final RecordReaderFactory readerService = new JsonTreeReader();
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
runner.setProperty(readerId, readerId);
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(writerId, writerId);
return runner;
}
/**
* Fabricate a {@link ConsumerPool} that uses a mock {@link Consumer} to simulate Kafka interactions.
*/
private ConsumerPool getConsumerPool(final ConsumerRecords<byte[], byte[]> consumerRecords,
final ProcessContext context, final ComponentLog logger) {
final RecordReaderFactory readerFactory = context.getProperty("record-reader")
.asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty("record-writer")
.asControllerService(RecordSetWriterFactory.class);
final String topic = context.getProperty("topic").getValue();
final Pattern patternTopic = (topic == null) ? null : Pattern.compile(topic);
final String groupId = context.getProperty(ConsumerConfig.GROUP_ID_CONFIG).getValue();
final OutputStrategy outputStrategy = OutputStrategy.valueOf(context.getProperty("output-strategy").getValue());
final String keyFormat = context.getProperty("key-format").getValue();
final RecordReaderFactory keyReaderFactory = context.getProperty("key-record-reader")
.asControllerService(RecordReaderFactory.class);
return new ConsumerPool(
1,
readerFactory,
writerFactory,
Collections.emptyMap(),
patternTopic,
100L,
"ssl",
"localhost",
logger,
true,
UTF_8,
null,
false,
UTF_8.name().toLowerCase(Locale.ROOT),
null,
true,
outputStrategy,
keyFormat,
keyReaderFactory) {
@Override
protected Consumer<byte[], byte[]> createKafkaConsumer() {
return getConsumer(groupId, consumerRecords);
}
};
}
public interface ConsumerBB extends Consumer<byte[], byte[]> {
}
/**
* Mock Kafka {@link Consumer} to be injected into NiFi processor for testing.
*/
private Consumer<byte[], byte[]> getConsumer(final String groupId, final ConsumerRecords<byte[], byte[]> records) {
final Consumer<byte[], byte[]> consumer = mock(ConsumerBB.class);
// signal polling to stop by returning an empty records response
when(consumer.poll(any())).thenReturn(records).thenReturn(getConsumerRecords());
when(consumer.groupMetadata()).thenReturn(new ConsumerGroupMetadata(groupId));
return consumer;
}
/**
* Fabricate a {@link ConsumerRecords} from a few {@link ConsumerRecord}.
*/
@SafeVarargs
private final ConsumerRecords<byte[], byte[]> getConsumerRecords(final ConsumerRecord<byte[], byte[]>... records) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
for (ConsumerRecord<byte[], byte[]> record : records) {
final TopicPartition partition = new TopicPartition(record.topic(), record.partition());
map.put(partition, Collections.singletonList(record));
}
return new ConsumerRecords<>(map);
}
/**
* Construct a {@link Header} collection to include in Kafka event.
*/
private Headers getKafkaHeaders() {
return new RecordHeaders(Arrays.asList(
new RecordHeader("header1", "value1".getBytes(UTF_8)),
new RecordHeader("header2", "value2".getBytes(UTF_8))));
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.KEY_AS_RECORD;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.OUTPUT_USE_VALUE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.OUTPUT_USE_WRAPPER;
import static org.mockito.Mockito.mock;
public class TestConsumeKafkaRecordKey_2_6 {
private ConsumerLease mockLease = null;
private ConsumerPool mockConsumerPool = null;
private TestRunner runner;
@BeforeEach
public void setup() throws InitializationException {
mockLease = mock(ConsumerLease.class);
mockConsumerPool = mock(ConsumerPool.class);
ConsumeKafkaRecord_2_6 proc = new ConsumeKafkaRecord_2_6() {
@Override
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
return mockConsumerPool;
}
};
runner = TestRunners.newTestRunner(proc);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
final String readerId = "record-reader";
final MockRecordParser readerService = new MockRecordParser();
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(ConsumeKafkaRecord_2_6.RECORD_READER, readerId);
runner.setProperty(ConsumeKafkaRecord_2_6.RECORD_WRITER, writerId);
}
@Test
public void testConfigValidation() {
runner.assertNotValid();
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.assertValid();
runner.setProperty(ConsumeKafkaRecord_2_6.OUTPUT_STRATEGY, "foo");
runner.assertNotValid();
runner.setProperty(ConsumeKafkaRecord_2_6.OUTPUT_STRATEGY, OUTPUT_USE_VALUE);
runner.assertValid();
runner.setProperty(ConsumeKafkaRecord_2_6.OUTPUT_STRATEGY, OUTPUT_USE_WRAPPER);
runner.assertValid();
runner.setProperty(ConsumeKafkaRecord_2_6.KEY_FORMAT, "foo");
runner.assertNotValid();
runner.setProperty(ConsumeKafkaRecord_2_6.KEY_FORMAT, KEY_AS_RECORD);
runner.assertValid();
runner.setProperty(ConsumeKafkaRecord_2_6.KEY_RECORD_READER, "no-record-reader");
runner.assertNotValid();
runner.setProperty(ConsumeKafkaRecord_2_6.KEY_RECORD_READER, "record-reader");
runner.assertValid();
}
@Test
public void testConsumeOneCallNoData() {
runner.setProperty(ConsumeKafkaRecord_2_6.TOPICS, "foo");
runner.setProperty(ConsumeKafkaRecord_2_6.GROUP_ID, "foo");
runner.run(1, true);
runner.assertTransferCount(ConsumeKafkaRecord_2_6.REL_SUCCESS, 0);
runner.assertTransferCount(ConsumeKafkaRecord_2_6.REL_PARSE_FAILURE, 0);
}
}

View File

@ -0,0 +1,372 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPublishKafkaMock {
private static long ordinal = 0L;
/**
* JSON serialization helper.
*/
private final ObjectMapper mapper = new ObjectMapper();
/**
* Ensure fresh data for each test run.
*/
private static final long TIMESTAMP = System.currentTimeMillis();
/**
* The name of the test kafka topic to be created.
*/
private static final String TEST_TOPIC_PUBLISH = "nifi-publish-" + TIMESTAMP;
@Test
public void testPublishRecordNullKey() throws JsonProcessingException, InitializationException {
// create flowfile to publish
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB");
final String value = mapper.writeValueAsString(node);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("attribute-name-regex", ".*A");
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> record = producedRecords.iterator().next();
assertEquals(TEST_TOPIC_PUBLISH, record.topic());
final Headers headers = record.headers();
assertEquals(1, headers.toArray().length);
assertEquals("attrValueA", new String(headers.lastHeader("attrKeyA").value(), UTF_8));
assertNull(record.key());
assertEquals(value, new String(record.value(), UTF_8));
}
@Test
public void testPublishRecordStringKey() throws JsonProcessingException, InitializationException {
// create flowfile to publish
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
attributes.put("messageKey", "this-is-a-key");
final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB");
final String value = mapper.writeValueAsString(node);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("attribute-name-regex", ".*B");
runner.setProperty("message-key-field", "recordB");
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> record = producedRecords.iterator().next();
assertEquals(TEST_TOPIC_PUBLISH, record.topic());
final Headers headers = record.headers();
assertEquals(1, headers.toArray().length);
assertEquals("attrValueB", new String(headers.lastHeader("attrKeyB").value(), UTF_8));
assertNotNull(record.key());
assertEquals("valueB", new String(record.key(), UTF_8));
assertNotNull(record.value());
assertEquals(value, new String(record.value(), UTF_8));
}
@Test
public void testPublishRecordWrapperStrategyNullKey() throws JsonProcessingException, InitializationException {
// create flowfile to publish
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
attributes.put("messageKey", "this-is-a-key");
final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB");
final String value = mapper.writeValueAsString(node);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("attribute-name-regex", "attr.*");
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> record = producedRecords.iterator().next();
assertEquals(TEST_TOPIC_PUBLISH, record.topic());
final Headers headers = record.headers();
assertEquals(2, headers.toArray().length);
assertEquals("attrValueA", new String(headers.lastHeader("attrKeyA").value(), UTF_8));
assertEquals("attrValueB", new String(headers.lastHeader("attrKeyB").value(), UTF_8));
assertNull(record.key());
assertNotNull(record.value());
assertEquals(value, new String(record.value(), UTF_8));
}
@Test
public void testPublishRecordWrapperStrategyStringKey() throws JsonProcessingException, InitializationException {
// create flowfile to publish
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
attributes.put("messageKey", "this-is-a-key");
final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB");
final String value = mapper.writeValueAsString(node);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("attribute-name-regex", ".*B");
runner.setProperty("message-key-field", "recordB");
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> record = producedRecords.iterator().next();
assertEquals(TEST_TOPIC_PUBLISH, record.topic());
final Headers headers = record.headers();
assertEquals(1, headers.toArray().length);
assertEquals("attrValueB", new String(headers.lastHeader("attrKeyB").value(), UTF_8));
assertNotNull(record.key());
assertEquals("valueB", new String(record.key(), UTF_8));
assertNotNull(record.value());
assertEquals(value, new String(record.value(), UTF_8));
}
@Test
public void testPublishRecordWrapperStrategyStringKeyRecordKeyWriter() throws JsonProcessingException, InitializationException {
// create flowfile to publish
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
attributes.put("messageKey", "this-is-a-key");
final ObjectNode node = mapper.createObjectNode().put("recordA", 1).put("recordB", "valueB");
final String value = mapper.writeValueAsString(node);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("attribute-name-regex", ".*B");
runner.setProperty("message-key-field", "recordB");
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty("record-key-writer", "record-writer");
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> producedRecord = producedRecords.iterator().next();
assertEquals("valueB", new String(producedRecord.key(), UTF_8));
assertEquals(value, new String(producedRecord.value(), UTF_8));
final List<MockFlowFile> success = runner.getFlowFilesForRelationship(PublishKafkaRecord_2_6.REL_SUCCESS);
final MockFlowFile flowFile1 = success.iterator().next();
assertNotNull(flowFile1.getAttribute("uuid"));
}
@Test
public void testPublishRecordWrapperStrategyRecordKeyRecordKeyWriter() throws JsonProcessingException, InitializationException {
// create flowfile to publish
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA", "attrValueA");
attributes.put("attrKeyB", "attrValueB");
final ObjectNode key = mapper.createObjectNode().put("recordKey", "recordValue");
final ObjectNode node = mapper.createObjectNode()
.put("recordA", 1).put("recordB", "valueB").set("recordKey", key);
final String value = mapper.writeValueAsString(node);
final MockFlowFile flowFile = new MockFlowFile(++ordinal);
flowFile.putAttributes(attributes);
flowFile.setData(value.getBytes(UTF_8));
// publish flowfile
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", TEST_TOPIC_PUBLISH);
runner.setProperty("publish-strategy", PublishStrategy.USE_WRAPPER.name());
runner.setProperty("message-key-field", "recordKey");
runner.setProperty("record-key-writer", "record-writer");
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> record = producedRecords.iterator().next();
assertEquals(TEST_TOPIC_PUBLISH, record.topic());
final Headers headers = record.headers();
assertEquals(0, headers.toArray().length);
assertNotNull(record.key());
final String keyString = mapper.writeValueAsString(key);
assertEquals(keyString, new String(record.key(), UTF_8));
assertNotNull(record.value());
assertEquals(value, new String(record.value(), UTF_8));
}
private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
throws InitializationException {
final String readerId = "record-reader";
final RecordReaderFactory readerService = new JsonTreeReader();
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
final String keyWriterId = "record-key-writer";
final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter();
final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return getPublisherPool(producedRecords, context);
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
runner.setProperty(readerId, readerId);
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(writerId, writerId);
runner.addControllerService(keyWriterId, keyWriterService);
runner.enableControllerService(keyWriterService);
runner.setProperty(keyWriterId, keyWriterId);
return runner;
}
private PublisherPool getPublisherPool(final Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final int maxMessageSize = context.getProperty("max.request.size").asDataSize(DataUnit.B).intValue();
final long maxAckWaitMillis = context.getProperty("ack.wait.time").asTimePeriod(TimeUnit.MILLISECONDS);
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
final boolean useTransactions = context.getProperty("use-transactions").asBoolean();
final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue();
Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class);
return new PublisherPool(
Collections.emptyMap(),
mock(ComponentLog.class),
maxMessageSize,
maxAckWaitMillis,
useTransactions,
transactionalIdSupplier,
attributeNamePattern,
charset,
publishStrategy,
recordKeyWriterFactory) {
@Override
public PublisherLease obtainPublisher() {
return getPublisherLease(producedRecords, context);
}
};
}
public interface ProducerBB extends Producer<byte[], byte[]> {
}
private PublisherLease getPublisherLease(final Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex);
final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer")
.asControllerService(RecordSetWriterFactory.class);
final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
when(producer.send(any(), any())).then(invocation -> {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
producedRecords.add(record);
final Callback callback = invocation.getArgument(1);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), null);
return null;
});
return new PublisherLease(
producer,
1024,
1000L,
mock(ComponentLog.class),
true,
patternAttributeName,
UTF_8,
null,
keyWriterFactory);
}
}

View File

@ -0,0 +1,289 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.commons.io.IOUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPublishKafkaMockParameterized {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ObjectMapper mapper = getObjectMapper();
public static Stream<Arguments> testCaseParametersProvider() {
return Stream.of(
arguments("PublishRecord/parameterized/flowfileInput1.json",
"account", ".*A.", getAttributes(), PublishStrategy.USE_VALUE,
"PublishRecord/parameterized/kafkaOutput1V.json"),
arguments("PublishRecord/parameterized/flowfileInput1.json",
"account", ".*B.", getAttributes(), PublishStrategy.USE_WRAPPER,
"PublishRecord/parameterized/kafkaOutput1W.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json",
"key", ".*1", getAttributes(), PublishStrategy.USE_VALUE,
"PublishRecord/parameterized/kafkaOutputAV.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json",
"key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER,
"PublishRecord/parameterized/kafkaOutputAW.json")
);
}
@ParameterizedTest
@MethodSource("testCaseParametersProvider")
public void testPublishKafkaRecord(final String flowfileInputResource,
final String messageKeyField,
final String attributeNameRegex,
final Map<String, String> attributes,
final PublishStrategy publishStrategy,
final String kafkaRecordExpectedOutputResource)
throws IOException, InitializationException {
final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(flowfileInputResource)));
logger.trace(new String(flowfileData, UTF_8));
final MockFlowFile flowFile = new MockFlowFile(1L);
flowFile.putAttributes(attributes);
flowFile.setData(flowfileData);
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", "test-topic");
runner.setProperty("attribute-name-regex", attributeNameRegex);
runner.setProperty("message-key-field", messageKeyField);
runner.setProperty("publish-strategy", publishStrategy.name());
runner.enqueue(flowFile);
runner.run(1);
// verify results
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next();
final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter()
.withObjectIndenter(new DefaultIndenter().withLinefeed("\n"));
final String json = mapper.writer(prettyPrinter).writeValueAsString(kafkaRecord);
logger.trace(json);
final String kafkaRecordExpected = IOUtils.toString(Objects.requireNonNull(
getClass().getClassLoader().getResource(kafkaRecordExpectedOutputResource)), UTF_8);
assertEquals(kafkaRecordExpected, json);
}
private static ObjectMapper getObjectMapper() {
final ObjectMapper objectMapper = new ObjectMapper()
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
final SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(RecordHeader.class, new HeaderSerializer());
simpleModule.addSerializer(new ProducerRecordBBSerializer(objectMapper));
objectMapper.registerModule(simpleModule);
return objectMapper;
}
/**
* Custom {@link com.fasterxml.jackson} serialization for {@link RecordHeader}.
*/
private static class HeaderSerializer extends JsonSerializer<RecordHeader> {
@Override
public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("RecordHeader-key",
(recordHeader.key() == null) ? null : recordHeader.key());
jsonGenerator.writeObjectField("RecordHeader-value",
(recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8));
jsonGenerator.writeEndObject();
}
}
/**
* Custom {@link com.fasterxml.jackson} serialization for {@link ProducerRecord}.
*/
private static class ProducerRecordBBSerializer extends StdSerializer<ProducerRecord<byte[], byte[]>> {
private final ObjectMapper objectMapper;
protected ProducerRecordBBSerializer(ObjectMapper objectMapper) {
super(ProducerRecord.class, false);
this.objectMapper = objectMapper;
}
@Override
public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
jsonGenerator.writeObjectField("ProducerRecord-key",
(producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key()));
jsonGenerator.writeObjectField("ProducerRecord-value",
(producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value()));
jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers());
jsonGenerator.writeEndObject();
}
}
private static Map<String, String> getAttributes() {
final Map<String, String> attributes = new TreeMap<>();
attributes.put("attrKeyA1", "attrValueA1");
attributes.put("attrKeyA2", "attrValueA2");
attributes.put("attrKeyB1", "attrValueB1");
attributes.put("attrKeyB2", "attrValueB2");
return attributes;
}
private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
throws InitializationException {
final String readerId = "record-reader";
final RecordReaderFactory readerService = new JsonTreeReader();
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
final String keyWriterId = "record-key-writer";
final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter();
final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return getPublisherPool(producedRecords, context);
}
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
runner.setProperty(readerId, readerId);
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(writerId, writerId);
runner.addControllerService(keyWriterId, keyWriterService);
runner.enableControllerService(keyWriterService);
runner.setProperty(keyWriterId, keyWriterId);
return runner;
}
private PublisherPool getPublisherPool(final Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final int maxMessageSize = context.getProperty("max.request.size").asDataSize(DataUnit.B).intValue();
final long maxAckWaitMillis = context.getProperty("ack.wait.time").asTimePeriod(TimeUnit.MILLISECONDS);
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
final boolean useTransactions = context.getProperty("use-transactions").asBoolean();
final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue();
Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class);
return new PublisherPool(
Collections.emptyMap(),
mock(ComponentLog.class),
maxMessageSize,
maxAckWaitMillis,
useTransactions,
transactionalIdSupplier,
attributeNamePattern,
charset,
publishStrategy,
recordKeyWriterFactory) {
@Override
public PublisherLease obtainPublisher() {
return getPublisherLease(producedRecords, context);
}
};
}
public interface ProducerBB extends Producer<byte[], byte[]> {
}
private PublisherLease getPublisherLease(final Collection<ProducerRecord<byte[], byte[]>> producedRecords,
final ProcessContext context) {
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex);
final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer")
.asControllerService(RecordSetWriterFactory.class);
final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
when(producer.send(any(), any())).then(invocation -> {
final ProducerRecord<byte[], byte[]> record = invocation.getArgument(0);
producedRecords.add(record);
final Callback callback = invocation.getArgument(1);
callback.onCompletion(new RecordMetadata(new TopicPartition("topic", 0), 0L, 0L, 0L, 0L, 0, 0), null);
return null;
});
return new PublisherLease(
producer,
1024,
1000L,
mock(ComponentLog.class),
true,
patternAttributeName,
UTF_8,
publishStrategy,
keyWriterFactory) {
@Override
protected long getTimestamp() {
return 1000000000000L;
}
};
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.function.Function;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.PUBLISH_USE_VALUE;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.PUBLISH_USE_WRAPPER;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestPublishKafkaRecordKey_2_6 {
private static final String TOPIC_NAME = "unit-test";
private PublisherPool mockPool;
private PublisherLease mockLease;
private TestRunner runner;
@BeforeEach
public void setup() throws InitializationException, IOException {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
any(RecordSchema.class), any(String.class), any(String.class), nullable(Function.class), any(PublishMetadataStrategy.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
runner = TestRunners.newTestRunner(new PublishKafkaRecord_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return mockPool;
}
});
runner.setProperty(PublishKafkaRecord_2_6.TOPIC, TOPIC_NAME);
final String readerId = "record-reader";
final MockRecordParser readerService = new MockRecordParser();
readerService.addSchemaField("name", RecordFieldType.STRING);
readerService.addSchemaField("age", RecordFieldType.INT);
runner.addControllerService(readerId, readerService);
runner.enableControllerService(readerService);
final String writerId = "record-writer";
final RecordSetWriterFactory writerService = new MockRecordWriter("name, age");
runner.addControllerService(writerId, writerService);
runner.enableControllerService(writerService);
runner.setProperty(PublishKafkaRecord_2_6.RECORD_READER, readerId);
runner.setProperty(PublishKafkaRecord_2_6.RECORD_WRITER, writerId);
runner.setProperty(PublishKafka_2_6.DELIVERY_GUARANTEE, PublishKafka_2_6.DELIVERY_REPLICATED);
}
@Test
public void testConfigValidation() {
runner.assertValid();
runner.setProperty(PublishKafkaRecord_2_6.PUBLISH_STRATEGY, "foo");
runner.assertNotValid();
runner.setProperty(PublishKafkaRecord_2_6.PUBLISH_STRATEGY, PUBLISH_USE_VALUE);
runner.assertValid();
runner.setProperty(PublishKafkaRecord_2_6.PUBLISH_STRATEGY, PUBLISH_USE_WRAPPER);
runner.assertValid();
runner.setProperty(PublishKafkaRecord_2_6.RECORD_KEY_WRITER, "no-record-writer");
runner.assertNotValid();
runner.setProperty(PublishKafkaRecord_2_6.RECORD_KEY_WRITER, "record-writer");
runner.assertValid();
}
}

View File

@ -73,7 +73,7 @@ public class TestPublishKafkaRecord_2_6 {
mockPool = mock(PublisherPool.class);
mockLease = mock(PublisherLease.class);
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
any(RecordSchema.class), any(String.class), any(String.class), nullable(Function.class));
any(RecordSchema.class), any(String.class), any(String.class), nullable(Function.class), any(PublishMetadataStrategy.class));
when(mockPool.obtainPublisher()).thenReturn(mockLease);
@ -113,7 +113,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -132,7 +132,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -148,7 +148,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@ -165,7 +165,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).close();
assertEquals(1, runner.getQueueSize().getObjectCount());
@ -217,7 +217,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 3);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
}
@ -236,7 +236,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_FAILURE, 0);
verify(mockLease, times(3)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();
@ -261,9 +261,9 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 2);
verify(mockLease, times(2)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(0)).publish(
any(FlowFile.class), any(Map.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class), any(Integer.class));
any(FlowFile.class), any(List.class), eq(null), any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class), any(Integer.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -293,7 +293,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(0)).poison();
verify(mockLease, times(1)).close();
@ -336,13 +336,13 @@ public class TestPublishKafkaRecord_2_6 {
return null;
}).when(mockLease).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
nullable(RecordSchema.class), nullable(String.class), any(String.class), nullable(Function.class));
nullable(RecordSchema.class), nullable(String.class), any(String.class), nullable(Function.class), any(PublishMetadataStrategy.class));
runner.run();
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
nullable(RecordSchema.class), nullable(String.class), any(String.class), nullable(Function.class));
nullable(RecordSchema.class), nullable(String.class), any(String.class), nullable(Function.class), any(PublishMetadataStrategy.class));
assertEquals(2, partitionsByAge.size()); // 2 ages
@ -380,7 +380,7 @@ public class TestPublishKafkaRecord_2_6 {
runner.assertTransferCount(PublishKafkaRecord_2_6.REL_FAILURE, 4);
verify(mockLease, times(4)).publish(any(FlowFile.class), any(RecordSet.class), any(RecordSetWriterFactory.class),
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class));
AdditionalMatchers.or(any(RecordSchema.class), isNull()), eq(null), eq(TOPIC_NAME), nullable(Function.class), any(PublishMetadataStrategy.class));
verify(mockLease, times(1)).complete();
verify(mockLease, times(1)).close();

View File

@ -247,7 +247,7 @@ public class TestPublisherLease {
Mockito.when(writerFactory.createWriter(eq(logger), eq(schema), any(), eq(flowFile))).thenReturn(writer);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic, null);
lease.publish(flowFile, recordSet, writerFactory, schema, keyField, topic, null, PublishMetadataStrategy.USE_CONFIGURED_VALUES);
verify(writerFactory, times(2)).createWriter(eq(logger), eq(schema), any(), eq(flowFile));
verify(writer, times(2)).write(any(Record.class));
@ -259,7 +259,7 @@ public class TestPublisherLease {
private final AtomicInteger poisonCount = new AtomicInteger(0);
public PoisonCountingLease() {
super(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8);
super(producer, 1024 * 1024, 1000L, logger, true, null, StandardCharsets.UTF_8, null, null);
}
@Override

View File

@ -37,7 +37,7 @@ public class TestPublisherPool {
kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, null, StandardCharsets.UTF_8);
final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, null, StandardCharsets.UTF_8, null, null);
assertEquals(0, pool.available());
final PublisherLease lease = pool.obtainPublisher();
@ -54,7 +54,7 @@ public class TestPublisherPool {
kafkaProperties.put("key.serializer", ByteArraySerializer.class.getName());
kafkaProperties.put("value.serializer", ByteArraySerializer.class.getName());
final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, null, StandardCharsets.UTF_8);
final PublisherPool pool = new PublisherPool(kafkaProperties, Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, null, StandardCharsets.UTF_8, null, null);
assertEquals(0, pool.available());
final PublisherLease lease = pool.obtainPublisher();

View File

@ -0,0 +1,8 @@
{
"address": "1234 First Street",
"zip": "12345",
"account": {
"name": "Acme",
"number": "AC1234"
}
}

View File

@ -0,0 +1,12 @@
{
"key": {
"type": "person"
},
"value": {
"name": "Mark",
"number": 49
},
"headers": {
"headerA": "headerAValue"
}
}

View File

@ -0,0 +1,21 @@
{
"ProducerRecord-key" : {
"name" : "Acme",
"number" : "AC1234"
},
"ProducerRecord-value" : {
"address" : "1234 First Street",
"zip" : "12345",
"account" : {
"name" : "Acme",
"number" : "AC1234"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attrKeyA1",
"RecordHeader-value" : "attrValueA1"
}, {
"RecordHeader-key" : "attrKeyA2",
"RecordHeader-value" : "attrValueA2"
} ]
}

View File

@ -0,0 +1,5 @@
{
"ProducerRecord-key" : null,
"ProducerRecord-value" : null,
"ProducerRecord-headers" : [ ]
}

View File

@ -0,0 +1,24 @@
{
"ProducerRecord-key" : {
"type" : "person"
},
"ProducerRecord-value" : {
"key" : {
"type" : "person"
},
"value" : {
"name" : "Mark",
"number" : 49
},
"headers" : {
"headerA" : "headerAValue"
}
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "attrKeyA1",
"RecordHeader-value" : "attrValueA1"
}, {
"RecordHeader-key" : "attrKeyB1",
"RecordHeader-value" : "attrValueB1"
} ]
}

View File

@ -0,0 +1,13 @@
{
"ProducerRecord-key" : {
"type" : "person"
},
"ProducerRecord-value" : {
"name" : "Mark",
"number" : 49
},
"ProducerRecord-headers" : [ {
"RecordHeader-key" : "headerA",
"RecordHeader-value" : "headerAValue"
} ]
}