diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java index 53d6fcd43c..847f8a4516 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java @@ -62,7 +62,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.10"}) @WritesAttributes({ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"), - @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. " + + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from") @@ -82,6 +83,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() .name("topic") .displayName("Topic Name(s)") @@ -110,6 +115,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { .defaultValue(OFFSET_LATEST.getValue()) .build(); + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() .name("message-demarcator") .displayName("Message Demarcator") @@ -148,6 +162,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { descriptors.add(TOPICS); descriptors.add(GROUP_ID); descriptors.add(AUTO_OFFSET_RESET); + descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(MESSAGE_DEMARCATOR); descriptors.add(MAX_POLL_RECORDS); DESCRIPTORS = Collections.unmodifiableList(descriptors); @@ -290,10 +305,24 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { } } + private String encodeKafkaKey(final byte[] key, final String encoding) { + if (key == null) { + return null; + } + + if (HEX_ENCODING.getValue().equals(encoding)) { + return DatatypeConverter.printHexBinary(key); + } else if (UTF8_ENCODING.getValue().equals(encoding)) { + return new String(key, StandardCharsets.UTF_8); + } else { + return null; // won't happen because it is guaranteed by the Allowable Values + } + } + private void writeData(final ProcessContext context, final ProcessSession session, final List> records, final long startTimeNanos) { final ConsumerRecord firstRecord = records.get(0); final String offset = String.valueOf(firstRecord.offset()); - final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null; + final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue()); final String topic = firstRecord.topic(); final String partition = String.valueOf(firstRecord.partition()); FlowFile flowFile = session.create(); @@ -309,8 +338,8 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { }); final Map kafkaAttrs = new HashMap<>(); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset); - if (keyHex != null && records.size() == 1) { - kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex); + if (keyValue != null && records.size() == 1) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue); } kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index c2cc32a5df..3ae749544c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -57,7 +57,7 @@ final class KafkaProcessorUtils { static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); - static final String KAFKA_KEY_HEX = "kafka.key.hex"; + static final String KAFKA_KEY = "kafka.key"; static final String KAFKA_TOPIC = "kafka.topic"; static final String KAFKA_PARTITION = "kafka.partition"; static final String KAFKA_OFFSET = "kafka.offset"; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java index 3ad2fc647b..5175f1374f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java @@ -104,6 +104,10 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", "DefaultPartitioner", "Messages will be assigned to random partitions."); + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters."); + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("topic") .displayName("Topic Name") @@ -155,6 +159,15 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() .name("message-demarcator") .displayName("Message Demarcator") @@ -216,6 +229,7 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { _descriptors.add(TOPIC); _descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(KEY); + _descriptors.add(KEY_ATTRIBUTE_ENCODING); _descriptors.add(MESSAGE_DEMARCATOR); _descriptors.add(MAX_REQUEST_SIZE); _descriptors.add(META_WAIT_TIME); @@ -449,26 +463,18 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { * regardless if it has #FAILED* attributes set. */ private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { - String topicName; - byte[] keyBytes; - byte[] delimiterBytes = null; + final byte[] keyBytes = getMessageKey(flowFile, context); + + final String topicName; + final byte[] delimiterBytes; int lastAckedMessageIndex = -1; if (this.isFailedFlowFile(flowFile)) { lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); - keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null - ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; - } else { topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); - String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX); - if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) { - keyBytes = DatatypeConverter.parseHexBinary(keyHex); - } delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; } @@ -480,6 +486,26 @@ public class PublishKafka_0_10 extends AbstractSessionFactoryProcessor { return publishingContext; } + private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { + final String uninterpretedKey; + if (context.getProperty(KEY).isSet()) { + uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + } else { + uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY); + } + + if (uninterpretedKey == null) { + return null; + } + + final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + if (UTF8_ENCODING.getValue().equals(keyEncoding)) { + return uninterpretedKey.getBytes(StandardCharsets.UTF_8); + } + + return DatatypeConverter.parseHexBinary(uninterpretedKey); + } + /** * Will remove FAILED_* attributes if FlowFile is no longer considered a * failed FlowFile diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index c172b0316e..a85563d980 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -301,6 +301,7 @@ public class ConsumeKafkaTest { } + @SuppressWarnings({"rawtypes", "unchecked"}) private ConsumerRecords createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { final Map>> map = new HashMap<>(); final TopicPartition tPart = new TopicPartition(topic, partition); @@ -314,6 +315,23 @@ public class ConsumeKafkaTest { return new ConsumerRecords(map); } + @SuppressWarnings({"rawtypes", "unchecked"}) + private ConsumerRecords createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map rawRecords) { + final Map>> map = new HashMap<>(); + final TopicPartition tPart = new TopicPartition(topic, partition); + final List> records = new ArrayList<>(); + long offset = startingOffset; + + for (final Map.Entry entry : rawRecords.entrySet()) { + final byte[] key = entry.getKey(); + final byte[] rawRecord = entry.getValue(); + final ConsumerRecord rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord); + records.add(rec); + } + map.put(tPart, records); + return new ConsumerRecords(map); + } + private ConsumerRecords mergeRecords(final ConsumerRecords... records) { final Map>> map = new HashMap<>(); for (final ConsumerRecords rec : records) { @@ -493,4 +511,119 @@ public class ConsumeKafkaTest { assertNull(null, mockPool.actualCommitOffsets); } + + @Test + public void validateUtf8Key() { + String groupName = "validateGetAllMessages"; + + final Map rawRecords = new HashMap<>(); + rawRecords.put("key1".getBytes(), "Hello-1".getBytes()); + rawRecords.put(new byte[0], "Hello-2".getBytes()); + rawRecords.put(null, "Hello-3".getBytes()); + + final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords); + + final List expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null); + mockPool.nextPlannedRecordsQueue.add(firstRecs); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List topics, final Map props, final ComponentLog log) { + return mockPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + + runner.run(1, false); + + final List flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(expectedTopics, mockPool.actualTopics); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); + + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count()); + + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + } + + @Test + public void validateHexKey() { + String groupName = "validateGetAllMessages"; + + final Map rawRecords = new HashMap<>(); + rawRecords.put("key1".getBytes(), "Hello-1".getBytes()); + rawRecords.put(new byte[0], "Hello-2".getBytes()); + rawRecords.put(null, "Hello-3".getBytes()); + + final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords); + + final List expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null); + mockPool.nextPlannedRecordsQueue.add(firstRecs); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List topics, final Map props, final ComponentLog log) { + return mockPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.setProperty(ConsumeKafka_0_10.KEY_ATTRIBUTE_ENCODING, ConsumeKafka_0_10.HEX_ENCODING); + + runner.run(1, false); + + final List flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka_0_10.REL_SUCCESS); + + assertEquals(expectedTopics, mockPool.actualTopics); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); + + final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase(); + + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count()); + + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java index 5480ea7dd1..af0d343ad5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -17,6 +17,10 @@ package org.apache.nifi.processors.kafka.pubsub; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -326,4 +330,46 @@ public class PublishKafkaTest { verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); runner.shutdown(); } + + @Test + public void validateUtf8Key() { + String topicName = "validateUtf8Key"; + StubPublishKafka putKafka = new StubPublishKafka(100); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka_0_10.TOPIC, topicName); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka_0_10.KEY, "${myKey}"); + + final Map attributes = Collections.singletonMap("myKey", "key1"); + runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1); + final Map msgs = putKafka.getMessagesSent(); + assertEquals(1, msgs.size()); + final byte[] msgKey = (byte[]) msgs.keySet().iterator().next(); + assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey)); + } + + @Test + public void validateHexKey() { + String topicName = "validateUtf8Key"; + StubPublishKafka putKafka = new StubPublishKafka(100); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka_0_10.TOPIC, topicName); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka_0_10.KEY_ATTRIBUTE_ENCODING, PublishKafka_0_10.HEX_ENCODING); + runner.setProperty(PublishKafka_0_10.KEY, "${myKey}"); + + final Map attributes = Collections.singletonMap("myKey", "6B657931"); + runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PublishKafka_0_10.REL_SUCCESS, 1); + final Map msgs = putKafka.getMessagesSent(); + assertEquals(1, msgs.size()); + final byte[] msgKey = (byte[]) msgs.keySet().iterator().next(); + + assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey)); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index 27d86f5065..c009014a86 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -53,6 +54,7 @@ public class StubPublishKafka extends PublishKafka_0_10 { private final int ackCheckSize; private final ExecutorService executor = Executors.newCachedThreadPool(); + private final Map msgsSent = new ConcurrentHashMap<>(); StubPublishKafka(int ackCheckSize) { this.ackCheckSize = ackCheckSize; @@ -66,6 +68,10 @@ public class StubPublishKafka extends PublishKafka_0_10 { this.executor.shutdownNow(); } + public Map getMessagesSent() { + return new HashMap<>(msgsSent); + } + @SuppressWarnings("unchecked") @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) @@ -107,7 +113,11 @@ public class StubPublishKafka extends PublishKafka_0_10 { when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer>() { @Override public Future answer(InvocationOnMock invocation) throws Throwable { - ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + if (record != null && record.key() != null) { + msgsSent.put(record.key(), record.value()); + } + String value = new String(record.value(), StandardCharsets.UTF_8); if ("fail".equals(value) && !StubPublishKafka.this.failed) { StubPublishKafka.this.failed = true; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java index e5255f5764..0a3fe5d969 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java @@ -62,7 +62,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.SECURI @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "0.9.x"}) @WritesAttributes({ @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"), - @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY_HEX, description = "The hex encoded key of message if present and if single message"), + @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. " + + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"), @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from") @@ -82,6 +83,10 @@ public class ConsumeKafka extends AbstractProcessor { static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() .name("topic") .displayName("Topic Name(s)") @@ -110,6 +115,15 @@ public class ConsumeKafka extends AbstractProcessor { .defaultValue(OFFSET_LATEST.getValue()) .build(); + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() .name("message-demarcator") .displayName("Message Demarcator") @@ -148,6 +162,7 @@ public class ConsumeKafka extends AbstractProcessor { descriptors.add(TOPICS); descriptors.add(GROUP_ID); descriptors.add(AUTO_OFFSET_RESET); + descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(MESSAGE_DEMARCATOR); descriptors.add(MAX_POLL_RECORDS); DESCRIPTORS = Collections.unmodifiableList(descriptors); @@ -290,10 +305,24 @@ public class ConsumeKafka extends AbstractProcessor { } } + private String encodeKafkaKey(final byte[] key, final String encoding) { + if (key == null) { + return null; + } + + if (HEX_ENCODING.getValue().equals(encoding)) { + return DatatypeConverter.printHexBinary(key); + } else if (UTF8_ENCODING.getValue().equals(encoding)) { + return new String(key, StandardCharsets.UTF_8); + } else { + return null; // won't happen because it is guaranteed by the Allowable Values + } + } + private void writeData(final ProcessContext context, final ProcessSession session, final List> records, final long startTimeNanos) { final ConsumerRecord firstRecord = records.get(0); final String offset = String.valueOf(firstRecord.offset()); - final String keyHex = (firstRecord.key() != null) ? DatatypeConverter.printHexBinary(firstRecord.key()) : null; + final String keyValue = encodeKafkaKey(firstRecord.key(), context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue()); final String topic = firstRecord.topic(); final String partition = String.valueOf(firstRecord.partition()); FlowFile flowFile = session.create(); @@ -309,8 +338,8 @@ public class ConsumeKafka extends AbstractProcessor { }); final Map kafkaAttrs = new HashMap<>(); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, offset); - if (keyHex != null && records.size() == 1) { - kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY_HEX, keyHex); + if (keyValue != null && records.size() == 1) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, keyValue); } kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, partition); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, topic); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index fd747fca02..c74ad18596 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -57,7 +57,7 @@ final class KafkaProcessorUtils { static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); - static final String KAFKA_KEY_HEX = "kafka.key.hex"; + static final String KAFKA_KEY = "kafka.key"; static final String KAFKA_TOPIC = "kafka.topic"; static final String KAFKA_PARTITION = "kafka.partition"; static final String KAFKA_OFFSET = "kafka.offset"; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java index 65f386e8a8..4e1403dbf4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka.java @@ -104,6 +104,10 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner", "DefaultPartitioner", "Messages will be assigned to random partitions."); + static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); + static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", + "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters."); + static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() .name("topic") .displayName("Topic Name") @@ -146,15 +150,23 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() .name("kafka-key") .displayName("Kafka Key") - .description("The Key to use for the Message. It will be serialized as UTF-8 bytes. " - + "If not specified then the flow file attribute kafka.key.hex is used if present " - + "and we're not demarcating. In that case the hex string is coverted to its byte" - + "form and written as a byte[] key.") + .description("The Key to use for the Message. " + + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present " + + "and we're not demarcating.") .required(false) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING) + .build(); + static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() .name("message-demarcator") .displayName("Message Demarcator") @@ -216,6 +228,7 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { _descriptors.add(TOPIC); _descriptors.add(DELIVERY_GUARANTEE); _descriptors.add(KEY); + _descriptors.add(KEY_ATTRIBUTE_ENCODING); _descriptors.add(MESSAGE_DEMARCATOR); _descriptors.add(MAX_REQUEST_SIZE); _descriptors.add(META_WAIT_TIME); @@ -449,26 +462,18 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { * regardless if it has #FAILED* attributes set. */ private PublishingContext buildPublishingContext(FlowFile flowFile, ProcessContext context, InputStream contentStream) { - String topicName; - byte[] keyBytes; - byte[] delimiterBytes = null; + final byte[] keyBytes = getMessageKey(flowFile, context); + + final String topicName; + final byte[] delimiterBytes; int lastAckedMessageIndex = -1; if (this.isFailedFlowFile(flowFile)) { lastAckedMessageIndex = Integer.valueOf(flowFile.getAttribute(FAILED_LAST_ACK_IDX)); topicName = flowFile.getAttribute(FAILED_TOPIC_ATTR); - keyBytes = flowFile.getAttribute(FAILED_KEY_ATTR) != null - ? flowFile.getAttribute(FAILED_KEY_ATTR).getBytes(StandardCharsets.UTF_8) : null; delimiterBytes = flowFile.getAttribute(FAILED_DELIMITER_ATTR) != null ? flowFile.getAttribute(FAILED_DELIMITER_ATTR).getBytes(StandardCharsets.UTF_8) : null; - } else { topicName = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); - String _key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - keyBytes = _key == null ? null : _key.getBytes(StandardCharsets.UTF_8); - String keyHex = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY_HEX); - if (_key == null && keyHex != null && KafkaProcessorUtils.HEX_KEY_PATTERN.matcher(keyHex).matches()) { - keyBytes = DatatypeConverter.parseHexBinary(keyHex); - } delimiterBytes = context.getProperty(MESSAGE_DEMARCATOR).isSet() ? context.getProperty(MESSAGE_DEMARCATOR) .evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8) : null; } @@ -480,6 +485,26 @@ public class PublishKafka extends AbstractSessionFactoryProcessor { return publishingContext; } + private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) { + final String uninterpretedKey; + if (context.getProperty(KEY).isSet()) { + uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + } else { + uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY); + } + + if (uninterpretedKey == null) { + return null; + } + + final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + if (UTF8_ENCODING.getValue().equals(keyEncoding)) { + return uninterpretedKey.getBytes(StandardCharsets.UTF_8); + } + + return DatatypeConverter.parseHexBinary(uninterpretedKey); + } + /** * Will remove FAILED_* attributes if FlowFile is no longer considered a * failed FlowFile diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 7874d4de82..7e4b12c171 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -16,6 +16,12 @@ */ package org.apache.nifi.processors.kafka.pubsub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.nio.charset.StandardCharsets; import java.util.ArrayDeque; import java.util.ArrayList; @@ -25,24 +31,19 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.UUID; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; - import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; public class ConsumeKafkaTest { @@ -301,6 +302,7 @@ public class ConsumeKafkaTest { } + @SuppressWarnings({"rawtypes", "unchecked"}) private ConsumerRecords createConsumerRecords(final String topic, final int partition, final long startingOffset, final byte[][] rawRecords) { final Map>> map = new HashMap<>(); final TopicPartition tPart = new TopicPartition(topic, partition); @@ -314,6 +316,23 @@ public class ConsumeKafkaTest { return new ConsumerRecords(map); } + @SuppressWarnings({"rawtypes", "unchecked"}) + private ConsumerRecords createConsumerRecords(final String topic, final int partition, final long startingOffset, final Map rawRecords) { + final Map>> map = new HashMap<>(); + final TopicPartition tPart = new TopicPartition(topic, partition); + final List> records = new ArrayList<>(); + long offset = startingOffset; + + for (final Map.Entry entry : rawRecords.entrySet()) { + final byte[] key = entry.getKey(); + final byte[] rawRecord = entry.getValue(); + final ConsumerRecord rec = new ConsumerRecord(topic, partition, offset++, key, rawRecord); + records.add(rec); + } + map.put(tPart, records); + return new ConsumerRecords(map); + } + private ConsumerRecords mergeRecords(final ConsumerRecords... records) { final Map>> map = new HashMap<>(); for (final ConsumerRecords rec : records) { @@ -493,4 +512,119 @@ public class ConsumeKafkaTest { assertNull(null, mockPool.actualCommitOffsets); } + + @Test + public void validateUtf8Key() { + String groupName = "validateGetAllMessages"; + + final Map rawRecords = new HashMap<>(); + rawRecords.put("key1".getBytes(), "Hello-1".getBytes()); + rawRecords.put(new byte[0], "Hello-2".getBytes()); + rawRecords.put(null, "Hello-3".getBytes()); + + final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords); + + final List expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null); + mockPool.nextPlannedRecordsQueue.add(firstRecs); + + ConsumeKafka proc = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List topics, final Map props, final ComponentLog log) { + return mockPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + + runner.run(1, false); + + final List flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); + + assertEquals(expectedTopics, mockPool.actualTopics); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); + + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "key1".equals(key)).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count()); + + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + } + + @Test + public void validateHexKey() { + String groupName = "validateGetAllMessages"; + + final Map rawRecords = new HashMap<>(); + rawRecords.put("key1".getBytes(), "Hello-1".getBytes()); + rawRecords.put(new byte[0], "Hello-2".getBytes()); + rawRecords.put(null, "Hello-3".getBytes()); + + final ConsumerRecords firstRecs = createConsumerRecords("foo", 1, 1L, rawRecords); + + final List expectedTopics = new ArrayList<>(); + expectedTopics.add("foo"); + expectedTopics.add("bar"); + final MockConsumerPool mockPool = new MockConsumerPool(1, expectedTopics, Collections.emptyMap(), null); + mockPool.nextPlannedRecordsQueue.add(firstRecs); + + ConsumeKafka proc = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(final int maxLeases, final List topics, final Map props, final ComponentLog log) { + return mockPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.setProperty(ConsumeKafka.KEY_ATTRIBUTE_ENCODING, ConsumeKafka.HEX_ENCODING); + + runner.run(1, false); + + final List flowFiles = runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS); + + assertEquals(expectedTopics, mockPool.actualTopics); + + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-1")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-2")).count()); + assertEquals(1, flowFiles.stream().map(ff -> new String(ff.toByteArray())).filter(content -> content.equals("Hello-3")).count()); + + final String expectedHex = (Integer.toHexString('k') + Integer.toHexString('e') + Integer.toHexString('y') + Integer.toHexString('1')).toUpperCase(); + + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> expectedHex.equals(key)).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> key == null).count()); + assertEquals(1, flowFiles.stream().map(ff -> ff.getAttribute(KafkaProcessorUtils.KAFKA_KEY)).filter(key -> "".equals(key)).count()); + + + //asert that all consumers were closed as expected + //assert that the consumer pool was properly closed + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertFalse(mockPool.wasPoolClosed); + runner.run(1, true); + assertFalse(mockPool.wasConsumerLeasePoisoned); + assertTrue(mockPool.wasConsumerLeaseClosed); + assertTrue(mockPool.wasPoolClosed); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java index 07ae2da106..d81f0c177f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java @@ -17,6 +17,10 @@ package org.apache.nifi.processors.kafka.pubsub; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; @@ -326,4 +330,46 @@ public class PublishKafkaTest { verify(producer, times(2)).send(Mockito.any(ProducerRecord.class)); runner.shutdown(); } + + @Test + public void validateUtf8Key() { + String topicName = "validateUtf8Key"; + StubPublishKafka putKafka = new StubPublishKafka(100); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.KEY, "${myKey}"); + + final Map attributes = Collections.singletonMap("myKey", "key1"); + runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); + final Map msgs = putKafka.getMessagesSent(); + assertEquals(1, msgs.size()); + final byte[] msgKey = (byte[]) msgs.keySet().iterator().next(); + assertTrue(Arrays.equals("key1".getBytes(StandardCharsets.UTF_8), msgKey)); + } + + @Test + public void validateHexKey() { + String topicName = "validateUtf8Key"; + StubPublishKafka putKafka = new StubPublishKafka(100); + TestRunner runner = TestRunners.newTestRunner(putKafka); + runner.setProperty(PublishKafka.TOPIC, topicName); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(PublishKafka.KEY_ATTRIBUTE_ENCODING, PublishKafka.HEX_ENCODING); + runner.setProperty(PublishKafka.KEY, "${myKey}"); + + final Map attributes = Collections.singletonMap("myKey", "6B657931"); + runner.enqueue("Hello World".getBytes(StandardCharsets.UTF_8), attributes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1); + final Map msgs = putKafka.getMessagesSent(); + assertEquals(1, msgs.size()); + final byte[] msgKey = (byte[]) msgs.keySet().iterator().next(); + + assertTrue(Arrays.equals(new byte[] {0x6B, 0x65, 0x79, 0x31}, msgKey)); + } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java index 950d623c65..533655e464 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java @@ -16,13 +16,16 @@ */ package org.apache.nifi.processors.kafka.pubsub; -import java.lang.reflect.Field; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -38,9 +41,7 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.BOOTSTRAP_SERVERS; import org.mockito.Mockito; -import static org.mockito.Mockito.mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -53,6 +54,7 @@ public class StubPublishKafka extends PublishKafka { private final int ackCheckSize; private final ExecutorService executor = Executors.newCachedThreadPool(); + private final Map msgsSent = new ConcurrentHashMap<>(); StubPublishKafka(int ackCheckSize) { this.ackCheckSize = ackCheckSize; @@ -66,6 +68,10 @@ public class StubPublishKafka extends PublishKafka { this.executor.shutdownNow(); } + public Map getMessagesSent() { + return new HashMap<>(msgsSent); + } + @SuppressWarnings("unchecked") @Override protected KafkaPublisher buildKafkaResource(ProcessContext context, ProcessSession session) @@ -82,6 +88,7 @@ public class StubPublishKafka extends PublishKafka { publisher = (KafkaPublisher) TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class); publisher.setAckWaitTime(15000); producer = mock(Producer.class); + this.instrumentProducer(producer, false); Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer"); kf.setAccessible(true); @@ -107,7 +114,11 @@ public class StubPublishKafka extends PublishKafka { when(producer.send(Mockito.any(ProducerRecord.class))).then(new Answer>() { @Override public Future answer(InvocationOnMock invocation) throws Throwable { - ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; + final ProducerRecord record = invocation.getArgumentAt(0, ProducerRecord.class); + if (record != null && record.key() != null) { + msgsSent.put(record.key(), record.value()); + } + String value = new String(record.value(), StandardCharsets.UTF_8); if ("fail".equals(value) && !StubPublishKafka.this.failed) { StubPublishKafka.this.failed = true;