diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java index e859f94ce5..4da485f252 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.KafkaException; @@ -78,6 +79,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group"); + static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); + + static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder() .name("topic") .displayName("Topic Name(s)") @@ -87,6 +92,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { .expressionLanguageSupported(true) .build(); + static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder() + .name("topic_type") + .displayName("Topic Name Format") + .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression") + .required(true) + .allowableValues(TOPIC_NAME, TOPIC_PATTERN) + .defaultValue(TOPIC_NAME.getValue()) + .build(); + static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() .name(ConsumerConfig.GROUP_ID_CONFIG) .displayName("Group ID") @@ -166,6 +180,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { List descriptors = new ArrayList<>(); descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors()); descriptors.add(TOPICS); + descriptors.add(TOPIC_TYPE); descriptors.add(GROUP_ID); descriptors.add(AUTO_OFFSET_RESET); descriptors.add(KEY_ATTRIBUTE_ENCODING); @@ -229,18 +244,26 @@ public class ConsumeKafka_0_10 extends AbstractProcessor { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue(); + final String topicType = context.getProperty(ConsumeKafka_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue(); final List topics = new ArrayList<>(); - for (final String topic : topicListing.split(",", 100)) { - final String trimmedName = topic.trim(); - if (!trimmedName.isEmpty()) { - topics.add(trimmedName); - } - } final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue(); final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue(); final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue(); - - return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); + if (topicType.equals(TOPIC_NAME.getValue())) { + for (final String topic : topicListing.split(",", 100)) { + final String trimmedName = topic.trim(); + if (!trimmedName.isEmpty()) { + topics.add(trimmedName); + } + } + return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); + } else if (topicType.equals(TOPIC_PATTERN.getValue())) { + final Pattern topicPattern = Pattern.compile(topicListing.trim()); + return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log); + } else { + getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType}); + return null; + } } @OnUnscheduled diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java index baacdc7619..b375b34afc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; @@ -40,6 +41,7 @@ public class ConsumerPool implements Closeable { private final BlockingQueue pooledLeases; private final List topics; + private final Pattern topicPattern; private final Map kafkaProperties; private final long maxWaitMillis; private final ComponentLog logger; @@ -74,7 +76,7 @@ public class ConsumerPool implements Closeable { public ConsumerPool( final int maxConcurrentLeases, final byte[] demarcator, - final Map kafkaProperties, + final Map kafkaProperties, final List topics, final long maxWaitMillis, final String keyEncoding, @@ -90,6 +92,29 @@ public class ConsumerPool implements Closeable { this.bootstrapServers = bootstrapServers; this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); this.topics = Collections.unmodifiableList(topics); + this.topicPattern = null; + } + + public ConsumerPool( + final int maxConcurrentLeases, + final byte[] demarcator, + final Map kafkaProperties, + final Pattern topics, + final long maxWaitMillis, + final String keyEncoding, + final String securityProtocol, + final String bootstrapServers, + final ComponentLog logger) { + this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases); + this.maxWaitMillis = maxWaitMillis; + this.logger = logger; + this.demarcatorBytes = demarcator; + this.keyEncoding = keyEncoding; + this.securityProtocol = securityProtocol; + this.bootstrapServers = bootstrapServers; + this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties); + this.topics = null; + this.topicPattern = topics; } /** @@ -119,7 +144,11 @@ public class ConsumerPool implements Closeable { * This subscription tightly couples the lease to the given * consumer. They cannot be separated from then on. */ - consumer.subscribe(topics, lease); + if (topics != null) { + consumer.subscribe(topics, lease); + } else { + consumer.subscribe(topicPattern, lease); + } } lease.setProcessSession(session); leasesObtainedCountRef.incrementAndGet(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 9b380d568a..4a5c4fbdc9 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -133,6 +133,38 @@ public class ConsumeKafkaTest { verifyNoMoreInteractions(mockLease); } + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setValidateExpressionUsage(false); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + @Test public void validateGetErrorMessages() throws Exception { String groupName = "validateGetErrorMessages"; diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml index 130609d2e6..c963f120b3 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml @@ -54,5 +54,5 @@ 1.2.0-SNAPSHOT - +