NIFI-2670: This closes #954. Expose option for how to interpret Kafka Key - hexadeimal encoding or UTF-8 String

This commit is contained in:
Mark Payne 2016-08-25 21:00:45 -04:00 committed by joewitt
parent 102a9a2b74
commit 58e0ce7f92
12 changed files with 538 additions and 49 deletions

View File

@ -62,7 +62,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10"}) @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10"})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from") @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@ -82,6 +83,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic") .name("topic")
.displayName("Topic Name(s)") .displayName("Topic Name(s)")
@ -110,6 +115,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.defaultValue(OFFSET_LATEST.getValue()) .defaultValue(OFFSET_LATEST.getValue())
.build(); .build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING)
.build();
static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator") .name("message-demarcator")
.displayName("Message Demarcator") .displayName("Message Demarcator")
@ -148,6 +162,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
descriptors.add(TOPICS); descriptors.add(TOPICS);
descriptors.add(GROUP_ID); descriptors.add(GROUP_ID);
descriptors.add(AUTO_OFFSET_RESET); descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR); descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS); descriptors.add(MAX_POLL_RECORDS);
DESCRIPTORS = Collections.unmodifiableList(descriptors); DESCRIPTORS = Collections.unmodifiableList(descriptors);
@ -290,10 +305,24 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
} }
} }
private String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) { private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0); final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final String offset = String.valueOf(firstRecord.offset()); final String offset = String.valueOf(firstRecord.offset());
final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null; final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
final String topic = firstRecord.topic(); final String topic = firstRecord.topic();
final String partition = String.valueOf(firstRecord.partition()); final String partition = String.valueOf(firstRecord.partition());
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
@ -309,8 +338,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
}); });
final Map<String, String> kafkaAttrs = new HashMap<>(); final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
if (keyHex != null && records.size() == 1) { if (keyValue != null && records.size() == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
} }
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);

View File

@ -57,7 +57,7 @@ final class KafkaProcessorUtils {
static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
static final String KAFKA_KEY_HEX = "kafka.key.hex"; static final String KAFKA_KEY = "kafka.key";
static final String KAFKA_TOPIC = "kafka.topic"; static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition"; static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset"; static final String KAFKA_OFFSET = "kafka.offset";

View File

@ -104,6 +104,10 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions."); "DefaultPartitioner", "Messages will be assigned to random partitions.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("topic") .name("topic")
.displayName("Topic Name") .displayName("Topic Name")
@ -155,6 +159,15 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING)
.build();
static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator") .name("message-demarcator")
.displayName("Message Demarcator") .displayName("Message Demarcator")
@ -216,6 +229,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
_descriptors.add(TOPIC); _descriptors.add(TOPIC);
_descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(DELIVERY_GUARANTEE);
_descriptors.add(KEY); _descriptors.add(KEY);
_descriptors.add(KEY_ATTRIBUTE_ENCODING);
_descriptors.add(MESSAGE_DEMARCATOR); _descriptors.add(MESSAGE_DEMARCATOR);
_descriptors.add(MAX_REQUEST_SIZE); _descriptors.add(MAX_REQUEST_SIZE);
_descriptors.add(META_WAIT_TIME); _descriptors.add(META_WAIT_TIME);
@ -449,26 +463,18 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
* regardless if it has #FAILED* attributes set. * regardless if it has #FAILED* attributes set.
*/ */
private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
String topicName; final byte[] keyBytes = getMessageKey(flowFile, context);
byte[] keyBytes;
byte[] delimiterBytes = null; final String topicName;
final byte[] delimiterBytes;
int lastAckedMessageIndex = -1; int lastAckedMessageIndex = -1;
if (this.isFailedFlowFile(flowFile)) { if (this.isFailedFlowFile(flowFile)) {
lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
} else { } else {
topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8);
String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX);
if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) {
keyBytes = DatatypeConverter.parseHexBinary(keyHex);
}
delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
} }
@ -480,6 +486,26 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor {
return publishingContext; return publishingContext;
} }
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {
uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
} else {
uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
}
if (uninterpretedKey == null) {
return null;
}
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
}
return DatatypeConverter.parseHexBinary(uninterpretedKey);
}
/** /**
* Will remove FAILED_* attributes if FlowFile is no longer considered a * Will remove FAILED_* attributes if FlowFile is no longer considered a
* failed FlowFile * failed FlowFile

View File

@ -301,6 +301,7 @@ public class ConsumeKafkaTest {
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition); final TopicPartition tPart = new TopicPartition(topic, partition);
@ -314,6 +315,23 @@ public class ConsumeKafkaTest {
return new ConsumerRecords(map); return new ConsumerRecords(map);
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
final byte[] key = entry.getKey();
final byte[] rawRecord = entry.getValue();
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) { private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
for (final ConsumerRecords<byte[], byte[]> rec : records) { for (final ConsumerRecords<byte[], byte[]> rec : records) {
@ -493,4 +511,119 @@ public class ConsumeKafkaTest {
assertNull(null, mockPool.actualCommitOffsets); assertNull(null, mockPool.actualCommitOffsets);
} }
@Test
public void validateUtf8Key() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
@Test
public void validateHexKey() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
} }

View File

@ -17,6 +17,10 @@
package org.apache.nifi.processors.kafka.pubsub; package org.apache.nifi.processors.kafka.pubsub;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -326,4 +330,46 @@ public class PublishKafkaTest {
verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
runner.shutdown(); runner.shutdown();
} }
@Test
public void validateUtf8Key() {
String topicName = "validateUtf8Key";
StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
final Map<Object, Object> msgs = putKafka.getMessagesSent();
assertEquals(1, msgs.size());
final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
}
@Test
public void validateHexKey() {
String topicName = "validateUtf8Key";
StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka_0_10.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING);
runner.setProperty(PublishKafka_0_10.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1);
final Map<Object, Object> msgs = putKafka.getMessagesSent();
assertEquals(1, msgs.size());
final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
}
} }

View File

@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -53,6 +54,7 @@ public class StubPublishKafka extends PublishKafka_0_10 {
private final int ackCheckSize; private final int ackCheckSize;
private final ExecutorService executor = Executors.newCachedThreadPool(); private final ExecutorService executor = Executors.newCachedThreadPool();
private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
StubPublishKafka(int ackCheckSize) { StubPublishKafka(int ackCheckSize) {
this.ackCheckSize = ackCheckSize; this.ackCheckSize = ackCheckSize;
@ -66,6 +68,10 @@ public class StubPublishKafka extends PublishKafka_0_10 {
this.executor.shutdownNow(); this.executor.shutdownNow();
} }
public Map<Object, Object> getMessagesSent() {
return new HashMap<>(msgsSent);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
@ -107,7 +113,11 @@ public class StubPublishKafka extends PublishKafka_0_10 {
when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() { when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
@Override @Override
public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0]; final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
if (record != null && record.key() != null) {
msgsSent.put(record.key(), record.value());
}
String value = new String(record.value(), StandardCharsets.UTF_8); String value = new String(record.value(), StandardCharsets.UTF_8);
if ("fail".equals(value) && !StubPublishKafka.this.failed) { if ("fail".equals(value) && !StubPublishKafka.this.failed) {
StubPublishKafka.this.failed = true; StubPublishKafka.this.failed = true;

View File

@ -62,7 +62,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI
@Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"}) @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"})
@WritesAttributes({ @WritesAttributes({
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+ "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
@WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from") @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
@ -82,6 +83,10 @@ public class ConsumeKafka extends AbstractProcessor {
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic") .name("topic")
.displayName("Topic Name(s)") .displayName("Topic Name(s)")
@ -110,6 +115,15 @@ public class ConsumeKafka extends AbstractProcessor {
.defaultValue(OFFSET_LATEST.getValue()) .defaultValue(OFFSET_LATEST.getValue())
.build(); .build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING)
.build();
static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator") .name("message-demarcator")
.displayName("Message Demarcator") .displayName("Message Demarcator")
@ -148,6 +162,7 @@ public class ConsumeKafka extends AbstractProcessor {
descriptors.add(TOPICS); descriptors.add(TOPICS);
descriptors.add(GROUP_ID); descriptors.add(GROUP_ID);
descriptors.add(AUTO_OFFSET_RESET); descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
descriptors.add(MESSAGE_DEMARCATOR); descriptors.add(MESSAGE_DEMARCATOR);
descriptors.add(MAX_POLL_RECORDS); descriptors.add(MAX_POLL_RECORDS);
DESCRIPTORS = Collections.unmodifiableList(descriptors); DESCRIPTORS = Collections.unmodifiableList(descriptors);
@ -290,10 +305,24 @@ public class ConsumeKafka extends AbstractProcessor {
} }
} }
private String encodeKafkaKey(final byte[] key, final String encoding) {
if (key == null) {
return null;
}
if (HEX_ENCODING.getValue().equals(encoding)) {
return DatatypeConverter.printHexBinary(key);
} else if (UTF8_ENCODING.getValue().equals(encoding)) {
return new String(key, StandardCharsets.UTF_8);
} else {
return null; // won't happen because it is guaranteed by the Allowable Values
}
}
private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) { private void writeData(final ProcessContext context, final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final long startTimeNanos) {
final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0); final ConsumerRecord<byte[], byte[]> firstRecord = records.get(0);
final String offset = String.valueOf(firstRecord.offset()); final String offset = String.valueOf(firstRecord.offset());
final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null; final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue());
final String topic = firstRecord.topic(); final String topic = firstRecord.topic();
final String partition = String.valueOf(firstRecord.partition()); final String partition = String.valueOf(firstRecord.partition());
FlowFile flowFile = session.create(); FlowFile flowFile = session.create();
@ -309,8 +338,8 @@ public class ConsumeKafka extends AbstractProcessor {
}); });
final Map<String, String> kafkaAttrs = new HashMap<>(); final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset);
if (keyHex != null && records.size() == 1) { if (keyValue != null && records.size() == 1) {
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue);
} }
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition);
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic);

View File

@ -57,7 +57,7 @@ final class KafkaProcessorUtils {
static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
static final String KAFKA_KEY_HEX = "kafka.key.hex"; static final String KAFKA_KEY = "kafka.key";
static final String KAFKA_TOPIC = "kafka.topic"; static final String KAFKA_TOPIC = "kafka.topic";
static final String KAFKA_PARTITION = "kafka.partition"; static final String KAFKA_PARTITION = "kafka.partition";
static final String KAFKA_OFFSET = "kafka.offset"; static final String KAFKA_OFFSET = "kafka.offset";

View File

@ -104,6 +104,10 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"DefaultPartitioner", "Messages will be assigned to random partitions."); "DefaultPartitioner", "Messages will be assigned to random partitions.");
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("topic") .name("topic")
.displayName("Topic Name") .displayName("Topic Name")
@ -146,15 +150,23 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
.name("kafka-key") .name("kafka-key")
.displayName("Kafka Key") .displayName("Kafka Key")
.description("The Key to use for the Message. It will be serialized as UTF-8 bytes. " .description("The Key to use for the Message. "
+ "If not specified then the flow file attribute kafka.key.hex is used if present " + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+ "and we're not demarcating. In that case the hex string is coverted to its byte" + "and we're not demarcating.")
+ "form and written as a byte[] key.")
.required(false) .required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true) .expressionLanguageSupported(true)
.build(); .build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
.name("key-attribute-encoding")
.displayName("Key Attribute Encoding")
.description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
.required(true)
.defaultValue(UTF8_ENCODING.getValue())
.allowableValues(UTF8_ENCODING, HEX_ENCODING)
.build();
static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
.name("message-demarcator") .name("message-demarcator")
.displayName("Message Demarcator") .displayName("Message Demarcator")
@ -216,6 +228,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
_descriptors.add(TOPIC); _descriptors.add(TOPIC);
_descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(DELIVERY_GUARANTEE);
_descriptors.add(KEY); _descriptors.add(KEY);
_descriptors.add(KEY_ATTRIBUTE_ENCODING);
_descriptors.add(MESSAGE_DEMARCATOR); _descriptors.add(MESSAGE_DEMARCATOR);
_descriptors.add(MAX_REQUEST_SIZE); _descriptors.add(MAX_REQUEST_SIZE);
_descriptors.add(META_WAIT_TIME); _descriptors.add(META_WAIT_TIME);
@ -449,26 +462,18 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
* regardless if it has #FAILED* attributes set. * regardless if it has #FAILED* attributes set.
*/ */
private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) {
String topicName; final byte[] keyBytes = getMessageKey(flowFile, context);
byte[] keyBytes;
byte[] delimiterBytes = null; final String topicName;
final byte[] delimiterBytes;
int lastAckedMessageIndex = -1; int lastAckedMessageIndex = -1;
if (this.isFailedFlowFile(flowFile)) { if (this.isFailedFlowFile(flowFile)) {
lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX));
topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR);
keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null
? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null;
delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null
? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null;
} else { } else {
topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8);
String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX);
if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) {
keyBytes = DatatypeConverter.parseHexBinary(keyHex);
}
delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR)
.evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null;
} }
@ -480,6 +485,26 @@ public class PublishKafka extends AbstractSessionFactoryProcessor {
return publishingContext; return publishingContext;
} }
private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
final String uninterpretedKey;
if (context.getProperty(KEY).isSet()) {
uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
} else {
uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
}
if (uninterpretedKey == null) {
return null;
}
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
}
return DatatypeConverter.parseHexBinary(uninterpretedKey);
}
/** /**
* Will remove FAILED_* attributes if FlowFile is no longer considered a * Will remove FAILED_* attributes if FlowFile is no longer considered a
* failed FlowFile * failed FlowFile

View File

@ -16,6 +16,12 @@
*/ */
package org.apache.nifi.processors.kafka.pubsub; package org.apache.nifi.processors.kafka.pubsub;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
@ -25,24 +31,19 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.UUID; import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners; import org.apache.nifi.util.TestRunners;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ConsumeKafkaTest { public class ConsumeKafkaTest {
@ -301,6 +302,7 @@ public class ConsumeKafkaTest {
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition); final TopicPartition tPart = new TopicPartition(topic, partition);
@ -314,6 +316,23 @@ public class ConsumeKafkaTest {
return new ConsumerRecords(map); return new ConsumerRecords(map);
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private ConsumerRecords<byte[], byte[]> createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map<byte[], byte[]> rawRecords) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
final TopicPartition tPart = new TopicPartition(topic, partition);
final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
long offset = startingOffset;
for (final Map.Entry<byte[], byte[]> entry : rawRecords.entrySet()) {
final byte[] key = entry.getKey();
final byte[] rawRecord = entry.getValue();
final ConsumerRecord<byte[], byte[]> rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord);
records.add(rec);
}
map.put(tPart, records);
return new ConsumerRecords(map);
}
private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) { private ConsumerRecords<byte[], byte[]> mergeRecords(final ConsumerRecords<byte[], byte[]>... records) {
final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>(); final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = new HashMap<>();
for (final ConsumerRecords<byte[], byte[]> rec : records) { for (final ConsumerRecords<byte[], byte[]> rec : records) {
@ -493,4 +512,119 @@ public class ConsumeKafkaTest {
assertNull(null, mockPool.actualCommitOffsets); assertNull(null, mockPool.actualCommitOffsets);
} }
@Test
public void validateUtf8Key() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
@Test
public void validateHexKey() {
String groupName = "validateGetAllMessages";
final Map<byte[], byte[]> rawRecords = new HashMap<>();
rawRecords.put("key1".getBytes(), "Hello-1".getBytes());
rawRecords.put(new byte[0], "Hello-2".getBytes());
rawRecords.put(null, "Hello-3".getBytes());
final ConsumerRecords<byte[], byte[]> firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords);
final List<String> expectedTopics = new ArrayList<>();
expectedTopics.add("foo");
expectedTopics.add("bar");
final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null);
mockPool.nextPlannedRecordsQueue.add(firstRecs);
ConsumeKafka proc = new ConsumeKafka() {
@Override
protected ConsumerPool createConsumerPool(final int maxLeases, final List<String> topics, final Map<String, String> props, final ComponentLog log) {
return mockPool;
}
};
final TestRunner runner = TestRunners.newTestRunner(proc);
runner.setValidateExpressionUsage(false);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST);
runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING);
runner.run(1, false);
final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
assertEquals(expectedTopics, mockPool.actualTopics);
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count());
assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count());
final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase();
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count());
assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count());
//asert that all consumers were closed as expected
//assert that the consumer pool was properly closed
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertFalse(mockPool.wasPoolClosed);
runner.run(1, true);
assertFalse(mockPool.wasConsumerLeasePoisoned);
assertTrue(mockPool.wasConsumerLeaseClosed);
assertTrue(mockPool.wasPoolClosed);
}
} }

View File

@ -17,6 +17,10 @@
package org.apache.nifi.processors.kafka.pubsub; package org.apache.nifi.processors.kafka.pubsub;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@ -326,4 +330,46 @@ public class PublishKafkaTest {
verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
runner.shutdown(); runner.shutdown();
} }
@Test
public void validateUtf8Key() {
String topicName = "validateUtf8Key";
StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "key1");
runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
final Map<Object, Object> msgs = putKafka.getMessagesSent();
assertEquals(1, msgs.size());
final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey));
}
@Test
public void validateHexKey() {
String topicName = "validateUtf8Key";
StubPublishKafka putKafka = new StubPublishKafka(100);
TestRunner runner = TestRunners.newTestRunner(putKafka);
runner.setProperty(PublishKafka.TOPIC, topicName);
runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(PublishKafka.KEY_ATTRIBUTE_ENCODING, PublishKafka.HEX_ENCODING);
runner.setProperty(PublishKafka.KEY, "${myKey}");
final Map<String, String> attributes = Collections.singletonMap("myKey", "6B657931");
runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes);
runner.run(1);
runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
final Map<Object, Object> msgs = putKafka.getMessagesSent();
assertEquals(1, msgs.size());
final byte[] msgKey = (byte[]) msgs.keySet().iterator().next();
assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey));
}
} }

View File

@ -16,13 +16,16 @@
*/ */
package org.apache.nifi.processors.kafka.pubsub; package org.apache.nifi.processors.kafka.pubsub;
import java.lang.reflect.Field; import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -38,9 +41,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.exception.ProcessException;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS;
import org.mockito.Mockito; import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -53,6 +54,7 @@ public class StubPublishKafka extends PublishKafka {
private final int ackCheckSize; private final int ackCheckSize;
private final ExecutorService executor = Executors.newCachedThreadPool(); private final ExecutorService executor = Executors.newCachedThreadPool();
private final Map<Object, Object> msgsSent = new ConcurrentHashMap<>();
StubPublishKafka(int ackCheckSize) { StubPublishKafka(int ackCheckSize) {
this.ackCheckSize = ackCheckSize; this.ackCheckSize = ackCheckSize;
@ -66,6 +68,10 @@ public class StubPublishKafka extends PublishKafka {
this.executor.shutdownNow(); this.executor.shutdownNow();
} }
public Map<Object, Object> getMessagesSent() {
return new HashMap<>(msgsSent);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session)
@ -82,6 +88,7 @@ public class StubPublishKafka extends PublishKafka {
publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
publisher.setAckWaitTime(15000); publisher.setAckWaitTime(15000);
producer = mock(Producer.class); producer = mock(Producer.class);
this.instrumentProducer(producer, false); this.instrumentProducer(producer, false);
Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
kf.setAccessible(true); kf.setAccessible(true);
@ -107,7 +114,11 @@ public class StubPublishKafka extends PublishKafka {
when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() { when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer<Future<RecordMetadata>>() {
@Override @Override
public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable { public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
ProducerRecord<byte[], byte[]> record = (ProducerRecord<byte[], byte[]>) invocation.getArguments()[0]; final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
if (record != null && record.key() != null) {
msgsSent.put(record.key(), record.value());
}
String value = new String(record.value(), StandardCharsets.UTF_8); String value = new String(record.value(), StandardCharsets.UTF_8);
if ("fail".equals(value) && !StubPublishKafka.this.failed) { if ("fail".equals(value) && !StubPublishKafka.this.failed) {
StubPublishKafka.this.failed = true; StubPublishKafka.this.failed = true;