From c610aab3cb017d7030381f2715de446923870966 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 27 Oct 2020 12:46:12 -0400 Subject: [PATCH] NIFI-7953: Updated ConsumeKafka_2_0/ConsumeKafkaRecord_2_0/ConsumeKafka_2_6/ConsumeKafkaRecord_2_6 to allow separating records by key --- .../org/apache/nifi/util/MockFlowFile.java | 3 +- .../kafka/pubsub/ConsumeKafkaRecord_2_0.java | 49 ++++++++++++++----- .../kafka/pubsub/ConsumeKafka_2_0.java | 17 ++++++- .../kafka/pubsub/ConsumerLease.java | 37 +++++++++----- .../processors/kafka/pubsub/ConsumerPool.java | 33 ++++++++----- .../kafka/pubsub/KafkaProcessorUtils.java | 2 + .../kafka/pubsub/ConsumerPoolTest.java | 2 + .../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 31 +++++++++++- .../kafka/pubsub/ConsumeKafka_2_6.java | 18 ++++++- .../kafka/pubsub/ConsumerLease.java | 33 +++++++++---- .../processors/kafka/pubsub/ConsumerPool.java | 21 ++++++-- .../kafka/pubsub/KafkaProcessorUtils.java | 5 +- .../kafka/pubsub/ConsumerPoolTest.java | 2 + 13 files changed, 194 insertions(+), 59 deletions(-) diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java index 375158c4d3..254320edd5 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java @@ -226,7 +226,8 @@ public class MockFlowFile implements FlowFileRecord { } public void assertAttributeEquals(final String attributeName, final String expectedValue) { - Assert.assertEquals(expectedValue, attributes.get(attributeName)); + Assert.assertEquals("Expected attribute " + attributeName + " to be " + expectedValue + " but instead it was " + attributes.get(attributeName), + expectedValue, attributes.get(attributeName)); } public void assertAttributeNotEquals(final String attributeName, final String expectedValue) { diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java index 7f5c75abe0..79824b190b 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java @@ -34,7 +34,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -57,6 +56,12 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; +import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; + @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.0 Consumer API. " + "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_0. Please note that, at this time, the Processor assumes that " + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the " @@ -77,7 +82,7 @@ import java.util.regex.Pattern; description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged." + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.", - expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY) + expressionLanguageScope = VARIABLE_REGISTRY) @SeeAlso({ConsumeKafka_2_0.class, PublishKafka_2_0.class, PublishKafkaRecord_2_0.class}) public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { @@ -93,7 +98,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { .description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); static final PropertyDescriptor TOPIC_TYPE = new Builder() @@ -110,7 +115,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { .displayName("Record Reader") .description("The Record Reader to use for incoming FlowFiles") .identifiesControllerService(RecordReaderFactory.class) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(NONE) .required(true) .build(); @@ -119,7 +124,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { .displayName("Record Writer") .description("The Record Writer to use in order to serialize the data before sending to Kafka") .identifiesControllerService(RecordSetWriterFactory.class) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(NONE) .required(true) .build(); @@ -129,7 +134,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { .description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.") .required(true) .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); static final PropertyDescriptor AUTO_OFFSET_RESET = new Builder() @@ -179,7 +184,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { + "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If " + "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait " + "for the producer to finish its entire transaction instead of pulling as the messages become available.") - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(NONE) .allowableValues("true", "false") .defaultValue("true") .required(true) @@ -203,9 +208,26 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { + "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling " + "the messages together efficiently.") .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .expressionLanguageSupported(NONE) .required(false) .build(); + static final PropertyDescriptor SEPARATE_BY_KEY = new Builder() + .name("separate-by-key") + .displayName("Separate By Key") + .description("If true, two Records will only be added to the same FlowFile if both of the Kafka Messages have identical keys.") + .required(false) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("If the property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE) + .build(); static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") @@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { descriptors.add(KafkaProcessorUtils.TOKEN_AUTH); descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); descriptors.add(GROUP_ID); + descriptors.add(SEPARATE_BY_KEY); + descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(AUTO_OFFSET_RESET); descriptors.add(MESSAGE_HEADER_ENCODING); descriptors.add(HEADER_NAME_REGEX); @@ -283,7 +307,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { .name(propertyDescriptorName) .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)) .dynamic(true) - .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .expressionLanguageSupported(VARIABLE_REGISTRY) .build(); } @@ -328,6 +352,9 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue(); final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex); + final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); + final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + if (topicType.equals(TOPIC_NAME.getValue())) { for (final String topic : topicListing.split(",", 100)) { final String trimmedName = topic.trim(); @@ -337,11 +364,11 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor { } return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol, - bootstrapServers, log, honorTransactions, charset, headerNamePattern); + bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding); } else if (topicType.equals(TOPIC_PATTERN.getValue())) { final Pattern topicPattern = Pattern.compile(topicListing.trim()); return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol, - bootstrapServers, log, honorTransactions, charset, headerNamePattern); + bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding); } else { getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); return null; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java index 13bebc96d6..fc00693e27 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java @@ -146,6 +146,16 @@ public class ConsumeKafka_2_0 extends AbstractProcessor { + "will result in a single FlowFile which " + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") .build(); + + static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder() + .name("separate-by-key") + .displayName("Separate By Key") + .description("If true, and the property is set, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.") + .required(false) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder() .name("header-name-regex") .displayName("Headers to Add as Attributes (Regex)") @@ -234,6 +244,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor { descriptors.add(AUTO_OFFSET_RESET); descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(MESSAGE_DEMARCATOR); + descriptors.add(SEPARATE_BY_KEY); descriptors.add(MESSAGE_HEADER_ENCODING); descriptors.add(HEADER_NAME_REGEX); descriptors.add(MAX_POLL_RECORDS); @@ -315,6 +326,8 @@ public class ConsumeKafka_2_0 extends AbstractProcessor { final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue(); final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex); + final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); + if (topicType.equals(TOPIC_NAME.getValue())) { for (final String topic : topicListing.split(",", 100)) { final String trimmedName = topic.trim(); @@ -323,11 +336,11 @@ public class ConsumeKafka_2_0 extends AbstractProcessor { } } - return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, + return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log, honorTransactions, charset, headerNamePattern); } else if (topicType.equals(TOPIC_PATTERN.getValue())) { final Pattern topicPattern = Pattern.compile(topicListing.trim()); - return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, + return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log, honorTransactions, charset, headerNamePattern); } else { getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 2674dd9dc3..3ecec49a9d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -49,6 +49,7 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private final RecordReaderFactory readerFactory; private final Charset headerCharacterSet; private final Pattern headerNamePattern; + private final boolean separateByKey; private boolean poisoned = false; //used for tracking demarcated flowfiles to their TopicPartition so we can append //to them on subsequent poll calls @@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordSetWriterFactory writerFactory, final ComponentLog logger, final Charset headerCharacterSet, - final Pattern headerNamePattern) { + final Pattern headerNamePattern, + final boolean separateByKey) { this.maxWaitMillis = maxWaitMillis; this.kafkaConsumer = kafkaConsumer; this.demarcatorBytes = demarcatorBytes; @@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe this.logger = logger; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; } /** @@ -164,7 +168,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe * flowfiles necessary or appends to existing ones if in demarcation mode. */ void poll() { - /** + /* * Implementation note: * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged, * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread. @@ -202,7 +206,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe return false; } try { - /** + /* * Committing the nifi session then the offsets means we have an at * least once guarantee here. If we reversed the order we'd have at * most once. @@ -412,7 +416,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private void writeDemarcatedData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { // Group the Records by their BundleInformation final Map>> map = records.stream() - .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec)))); + .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ? rec.key() : null))); for (final Map.Entry>> entry : map.entrySet()) { final BundleInformation bundleInfo = entry.getKey(); @@ -538,7 +542,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe while ((record = reader.nextRecord()) != null) { // Determine the bundle for this record. final RecordSchema recordSchema = record.getSchema(); - final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null); BundleTracker tracker = bundleMap.get(bundleInfo); if (tracker == null) { @@ -626,9 +630,16 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final Map kafkaAttrs = new HashMap<>(); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset)); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp)); - if (tracker.key != null && tracker.totalRecords == 1) { - kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key); + + // If we have a kafka key, we will add it as an attribute only if + // the FlowFile contains a single Record, or if the Records have been separated by Key, + // because we then know that even though there are multiple Records, they all have the same key. + if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) { + if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key); + } } + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition)); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic); if (tracker.totalRecords > 1) { @@ -647,8 +658,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe tracker.updateFlowFile(newFlowFile); } - private static class BundleTracker { + private static class BundleTracker { final long initialOffset; final long initialTimestamp; final int partition; @@ -678,23 +689,24 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private void updateFlowFile(final FlowFile flowFile) { this.flowFile = flowFile; } - } private static class BundleInformation { private final TopicPartition topicPartition; private final RecordSchema schema; private final Map attributes; + private final byte[] messageKey; - public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map attributes) { + public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map attributes, final byte[] messageKey) { this.topicPartition = topicPartition; this.schema = schema; this.attributes = attributes; + this.messageKey = messageKey; } @Override public int hashCode() { - return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode()); + return 41 + Objects.hash(topicPartition, schema, attributes) + 37 * Arrays.hashCode(messageKey); } @Override @@ -710,7 +722,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } final BundleInformation other = (BundleInformation) obj; - return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes); + return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes) + && Arrays.equals(this.messageKey, other.messageKey); } } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java index 7f02b2605b..04627297ef 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -59,6 +59,7 @@ public class ConsumerPool implements Closeable { private final RecordSetWriterFactory writerFactory; private final Charset headerCharacterSet; private final Pattern headerNamePattern; + private final boolean separateByKey; private final AtomicLong consumerCreatedCountRef = new AtomicLong(); private final AtomicLong consumerClosedCountRef = new AtomicLong(); private final AtomicLong leasesObtainedCountRef = new AtomicLong(); @@ -86,6 +87,7 @@ public class ConsumerPool implements Closeable { public ConsumerPool( final int maxConcurrentLeases, final byte[] demarcator, + final boolean separateByKey, final Map kafkaProperties, final List topics, final long maxWaitMillis, @@ -111,11 +113,13 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; } public ConsumerPool( final int maxConcurrentLeases, final byte[] demarcator, + final boolean separateByKey, final Map kafkaProperties, final Pattern topics, final long maxWaitMillis, @@ -141,6 +145,7 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; } public ConsumerPool( @@ -155,12 +160,13 @@ public class ConsumerPool implements Closeable { final ComponentLog logger, final boolean honorTransactions, final Charset headerCharacterSet, - final Pattern headerNamePattern) { + final Pattern headerNamePattern, + final boolean separateByKey, + final String keyEncoding) { this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = null; - this.keyEncoding = null; this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.securityProtocol = securityProtocol; @@ -171,6 +177,8 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; + this.keyEncoding = keyEncoding; } public ConsumerPool( @@ -185,12 +193,13 @@ public class ConsumerPool implements Closeable { final ComponentLog logger, final boolean honorTransactions, final Charset headerCharacterSet, - final Pattern headerNamePattern) { + final Pattern headerNamePattern, + final boolean separateByKey, + final String keyEncoding) { this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = null; - this.keyEncoding = null; this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.securityProtocol = securityProtocol; @@ -201,6 +210,8 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; + this.keyEncoding = keyEncoding; } /** @@ -218,7 +229,8 @@ public class ConsumerPool implements Closeable { if (lease == null) { final Consumer consumer = createKafkaConsumer(); consumerCreatedCountRef.incrementAndGet(); - /** + + /* * For now return a new consumer lease. But we could later elect to * have this return null if we determine the broker indicates that * the lag time on all topics being monitored is sufficiently low. @@ -228,10 +240,9 @@ public class ConsumerPool implements Closeable { * sitting idle which could prompt excessive rebalances. */ lease = new SimpleConsumerLease(consumer); - /** - * This subscription tightly couples the lease to the given - * consumer. They cannot be separated from then on. - */ + + // This subscription tightly couples the lease to the given + // consumer. They cannot be separated from then on. if (topics != null) { consumer.subscribe(topics, lease); } else { @@ -268,7 +279,7 @@ public class ConsumerPool implements Closeable { public void close() { final List leases = new ArrayList<>(); pooledLeases.drainTo(leases); - leases.stream().forEach((lease) -> { + leases.forEach((lease) -> { lease.close(true); }); } @@ -301,7 +312,7 @@ public class ConsumerPool implements Closeable { private SimpleConsumerLease(final Consumer consumer) { super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, - readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern); + readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey); this.consumer = consumer; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index e756776f55..b089fce5b6 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -60,6 +60,8 @@ public final class KafkaProcessorUtils { static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); + static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute", + "The key will not be added as an Attribute"); static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 9d53ee652a..d006a6ef92 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -70,6 +70,7 @@ public class ConsumerPoolTest { testPool = new ConsumerPool( 1, null, + false, Collections.emptyMap(), Collections.singletonList("nifi"), 100L, @@ -88,6 +89,7 @@ public class ConsumerPoolTest { testDemarcatedPool = new ConsumerPool( 1, "--demarcator--".getBytes(StandardCharsets.UTF_8), + false, Collections.emptyMap(), Collections.singletonList("nifi"), 100L, diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index 80430587bf..3e7b16a28c 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -57,6 +57,10 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING; +import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING; + @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.6 Consumer API. " + "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_6. Please note that, at this time, the Processor assumes that " + "all records that are retrieved from a given partition have the same schema. If any of the Kafka messages are pulled but cannot be parsed or written with the " @@ -207,6 +211,24 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor { .required(false) .build(); + static final PropertyDescriptor SEPARATE_BY_KEY = new Builder() + .name("separate-by-key") + .displayName("Separate By Key") + .description("If true, two Records will only be added to the same FlowFile if both of the Kafka Messages have identical keys.") + .required(false) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder() + .name("key-attribute-encoding") + .displayName("Key Attribute Encoding") + .description("If the property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + + "'. This property dictates how the value of the attribute should be encoded.") + .required(true) + .defaultValue(UTF8_ENCODING.getValue()) + .allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE) + .build(); + static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("FlowFiles received from Kafka. Depending on demarcation strategy it is a flow file per message or a bundle of messages grouped by topic and partition.") @@ -242,6 +264,8 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor { descriptors.add(KafkaProcessorUtils.TOKEN_AUTH); descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE); descriptors.add(GROUP_ID); + descriptors.add(SEPARATE_BY_KEY); + descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(AUTO_OFFSET_RESET); descriptors.add(MESSAGE_HEADER_ENCODING); descriptors.add(HEADER_NAME_REGEX); @@ -328,6 +352,9 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor { final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue(); final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex); + final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); + final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); + if (topicType.equals(TOPIC_NAME.getValue())) { for (final String topic : topicListing.split(",", 100)) { final String trimmedName = topic.trim(); @@ -337,11 +364,11 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor { } return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol, - bootstrapServers, log, honorTransactions, charset, headerNamePattern); + bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding); } else if (topicType.equals(TOPIC_PATTERN.getValue())) { final Pattern topicPattern = Pattern.compile(topicListing.trim()); return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol, - bootstrapServers, log, honorTransactions, charset, headerNamePattern); + bootstrapServers, log, honorTransactions, charset, headerNamePattern, separateByKey, keyEncoding); } else { getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); return null; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index c96bb60a9a..5461abbc6f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -146,6 +146,17 @@ public class ConsumeKafka_2_6 extends AbstractProcessor { + "will result in a single FlowFile which " + "time it is triggered. To enter special character such as 'new line' use CTRL+Enter or Shift+Enter depending on the OS") .build(); + + + static final PropertyDescriptor SEPARATE_BY_KEY = new PropertyDescriptor.Builder() + .name("separate-by-key") + .displayName("Separate By Key") + .description("If true, and the property is set, two messages will only be added to the same FlowFile if both of the Kafka Messages have identical keys.") + .required(false) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + static final PropertyDescriptor HEADER_NAME_REGEX = new PropertyDescriptor.Builder() .name("header-name-regex") .displayName("Headers to Add as Attributes (Regex)") @@ -234,6 +245,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor { descriptors.add(AUTO_OFFSET_RESET); descriptors.add(KEY_ATTRIBUTE_ENCODING); descriptors.add(MESSAGE_DEMARCATOR); + descriptors.add(SEPARATE_BY_KEY); descriptors.add(MESSAGE_HEADER_ENCODING); descriptors.add(HEADER_NAME_REGEX); descriptors.add(MAX_POLL_RECORDS); @@ -315,6 +327,8 @@ public class ConsumeKafka_2_6 extends AbstractProcessor { final String headerNameRegex = context.getProperty(HEADER_NAME_REGEX).getValue(); final Pattern headerNamePattern = headerNameRegex == null ? null : Pattern.compile(headerNameRegex); + final boolean separateByKey = context.getProperty(SEPARATE_BY_KEY).asBoolean(); + if (topicType.equals(TOPIC_NAME.getValue())) { for (final String topic : topicListing.split(",", 100)) { final String trimmedName = topic.trim(); @@ -323,11 +337,11 @@ public class ConsumeKafka_2_6 extends AbstractProcessor { } } - return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, + return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log, honorTransactions, charset, headerNamePattern); } else if (topicType.equals(TOPIC_PATTERN.getValue())) { final Pattern topicPattern = Pattern.compile(topicListing.trim()); - return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, + return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log, honorTransactions, charset, headerNamePattern); } else { getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index 458165bcc2..c3846a2b13 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -49,6 +49,7 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -83,6 +84,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private final RecordReaderFactory readerFactory; private final Charset headerCharacterSet; private final Pattern headerNamePattern; + private final boolean separateByKey; private boolean poisoned = false; //used for tracking demarcated flowfiles to their TopicPartition so we can append //to them on subsequent poll calls @@ -103,7 +105,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final RecordSetWriterFactory writerFactory, final ComponentLog logger, final Charset headerCharacterSet, - final Pattern headerNamePattern) { + final Pattern headerNamePattern, + final boolean separateByKey) { this.maxWaitMillis = maxWaitMillis; this.kafkaConsumer = kafkaConsumer; this.demarcatorBytes = demarcatorBytes; @@ -115,6 +118,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe this.logger = logger; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; } /** @@ -412,7 +416,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private void writeDemarcatedData(final ProcessSession session, final List> records, final TopicPartition topicPartition) { // Group the Records by their BundleInformation final Map>> map = records.stream() - .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec)))); + .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec), separateByKey ? rec.key() : null))); for (final Map.Entry>> entry : map.entrySet()) { final BundleInformation bundleInfo = entry.getKey(); @@ -538,7 +542,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe while ((record = reader.nextRecord()) != null) { // Determine the bundle for this record. final RecordSchema recordSchema = record.getSchema(); - final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes); + final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes, separateByKey ? consumerRecord.key() : null); BundleTracker tracker = bundleMap.get(bundleInfo); if (tracker == null) { @@ -626,9 +630,16 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe final Map kafkaAttrs = new HashMap<>(); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset)); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp)); - if (tracker.key != null && tracker.totalRecords == 1) { - kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key); + + // If we have a kafka key, we will add it as an attribute only if + // the FlowFile contains a single Record, or if the Records have been separated by Key, + // because we then know that even though there are multiple Records, they all have the same key. + if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) { + if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) { + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key); + } } + kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition)); kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic); if (tracker.totalRecords > 1) { @@ -647,8 +658,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe tracker.updateFlowFile(newFlowFile); } - private static class BundleTracker { + private static class BundleTracker { final long initialOffset; final long initialTimestamp; final int partition; @@ -678,23 +689,24 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe private void updateFlowFile(final FlowFile flowFile) { this.flowFile = flowFile; } - } private static class BundleInformation { private final TopicPartition topicPartition; private final RecordSchema schema; private final Map attributes; + private final byte[] messageKey; - public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map attributes) { + public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map attributes, final byte[] messageKey) { this.topicPartition = topicPartition; this.schema = schema; this.attributes = attributes; + this.messageKey = messageKey; } @Override public int hashCode() { - return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode()); + return 41 + Objects.hash(topicPartition, schema, attributes) + 37 * Arrays.hashCode(messageKey); } @Override @@ -710,7 +722,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe } final BundleInformation other = (BundleInformation) obj; - return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes); + return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes) + && Arrays.equals(this.messageKey, other.messageKey); } } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java index 7f02b2605b..2a332980b0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -59,6 +59,7 @@ public class ConsumerPool implements Closeable { private final RecordSetWriterFactory writerFactory; private final Charset headerCharacterSet; private final Pattern headerNamePattern; + private final boolean separateByKey; private final AtomicLong consumerCreatedCountRef = new AtomicLong(); private final AtomicLong consumerClosedCountRef = new AtomicLong(); private final AtomicLong leasesObtainedCountRef = new AtomicLong(); @@ -86,6 +87,7 @@ public class ConsumerPool implements Closeable { public ConsumerPool( final int maxConcurrentLeases, final byte[] demarcator, + final boolean separateByKey, final Map kafkaProperties, final List topics, final long maxWaitMillis, @@ -111,11 +113,13 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; } public ConsumerPool( final int maxConcurrentLeases, final byte[] demarcator, + final boolean separateByKey, final Map kafkaProperties, final Pattern topics, final long maxWaitMillis, @@ -141,6 +145,7 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; } public ConsumerPool( @@ -155,12 +160,13 @@ public class ConsumerPool implements Closeable { final ComponentLog logger, final boolean honorTransactions, final Charset headerCharacterSet, - final Pattern headerNamePattern) { + final Pattern headerNamePattern, + final boolean separateByKey, + final String keyEncoding) { this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = null; - this.keyEncoding = null; this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.securityProtocol = securityProtocol; @@ -171,6 +177,8 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; + this.keyEncoding = keyEncoding; } public ConsumerPool( @@ -185,12 +193,13 @@ public class ConsumerPool implements Closeable { final ComponentLog logger, final boolean honorTransactions, final Charset headerCharacterSet, - final Pattern headerNamePattern) { + final Pattern headerNamePattern, + final boolean separateByKey, + final String keyEncoding) { this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); this.maxWaitMillis = maxWaitMillis; this.logger = logger; this.demarcatorBytes = null; - this.keyEncoding = null; this.readerFactory = readerFactory; this.writerFactory = writerFactory; this.securityProtocol = securityProtocol; @@ -201,6 +210,8 @@ public class ConsumerPool implements Closeable { this.honorTransactions = honorTransactions; this.headerCharacterSet = headerCharacterSet; this.headerNamePattern = headerNamePattern; + this.separateByKey = separateByKey; + this.keyEncoding = keyEncoding; } /** @@ -301,7 +312,7 @@ public class ConsumerPool implements Closeable { private SimpleConsumerLease(final Consumer consumer) { super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers, - readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern); + readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern, separateByKey); this.consumer = consumer; } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index e756776f55..44a6984338 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -50,7 +50,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.regex.Pattern; public final class KafkaProcessorUtils { private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB"; @@ -60,8 +59,8 @@ public final class KafkaProcessorUtils { static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); - - static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+"); + static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute", + "The key will not be added as an Attribute"); static final String KAFKA_KEY = "kafka.key"; static final String KAFKA_TOPIC = "kafka.topic"; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java index 9d53ee652a..d006a6ef92 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java @@ -70,6 +70,7 @@ public class ConsumerPoolTest { testPool = new ConsumerPool( 1, null, + false, Collections.emptyMap(), Collections.singletonList("nifi"), 100L, @@ -88,6 +89,7 @@ public class ConsumerPoolTest { testDemarcatedPool = new ConsumerPool( 1, "--demarcator--".getBytes(StandardCharsets.UTF_8), + false, Collections.emptyMap(), Collections.singletonList("nifi"), 100L,