From b368c281e8e6c25e040b083db6ebaaf95ec04c9a Mon Sep 17 00:00:00 2001 From: Paul Grey Date: Wed, 28 Feb 2024 16:39:10 -0500 Subject: [PATCH] NIFI-12851 - ConsumeKafka, remove limitation on count of subscribed topics Signed-off-by: Pierre Villard This closes #8460. --- .../kafka/pubsub/ConsumeKafkaRecord_2_6.java | 2 +- .../kafka/pubsub/ConsumeKafka_2_6.java | 2 +- .../kafka/pubsub/TestConsumeKafka_2_6.java | 38 +++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java index 782d43de24..5e263f0599 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java @@ -451,7 +451,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl } if (topicType.equals(TOPIC_NAME.getValue())) { - for (final String topic : topicListing.split(",", 100)) { + for (final String topic : topicListing.split(",")) { final String trimmedName = topic.trim(); if (!trimmedName.isEmpty()) { topics.add(trimmedName); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java index 043e81a313..784121e89e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_6.java @@ -397,7 +397,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo } if (topicType.equals(TOPIC_NAME.getValue())) { - for (final String topic : topicListing.split(",", 100)) { + for (final String topic : topicListing.split(",")) { final String trimmedName = topic.trim(); if (!trimmedName.isEmpty()) { topics.add(trimmedName); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java index 872df85cb9..42ab9d82ad 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_6.java @@ -22,12 +22,20 @@ import org.apache.nifi.kafka.shared.property.SaslMechanism; import org.apache.nifi.kafka.shared.property.SecurityProtocol; import org.apache.nifi.kerberos.KerberosUserService; import org.apache.nifi.kerberos.SelfContainedKerberosUserService; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -44,6 +52,36 @@ public class TestConsumeKafka_2_6 { mockConsumerPool = mock(ConsumerPool.class); } + @Test + public void validateNoLimitToTopicCount() { + final int expectedCount = 101; + final String topics = String.join(",", Collections.nCopies(expectedCount, "foo")); + final ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6() { + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + final ConsumerPool consumerPool = super.createConsumerPool(context, log); + try { + final Field topicsField = ConsumerPool.class.getDeclaredField("topics"); + topicsField.setAccessible(true); + final Object o = topicsField.get(consumerPool); + final List list = assertInstanceOf(List.class, o); + assertEquals(expectedCount, list.size()); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException(e); + } + return consumerPool; + } + }; + + TestRunner runner = TestRunners.newTestRunner(consumeKafka); + runner.setValidateExpressionUsage(false); + runner.setProperty(ConsumeKafka_2_6.BOOTSTRAP_SERVERS, "localhost:1234"); + runner.setProperty(ConsumeKafka_2_6.TOPICS, topics); + runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo"); + runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST); + runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + runner.run(); + } + @Test public void validateCustomValidatorSettings() { ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();