NIFI-12851 - ConsumeKafka, remove limitation on count of subscribed topics

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8460.
This commit is contained in:
Paul Grey 2024-02-28 16:39:10 -05:00 committed by Pierre Villard
parent 1cb0a53711
commit b368c281e8
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
3 changed files with 40 additions and 2 deletions

View File

@ -451,7 +451,7 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements KafkaCl
}
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
for (final String topic : topicListing.split(",")) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);

View File

@ -397,7 +397,7 @@ public class ConsumeKafka_2_6 extends AbstractProcessor implements KafkaClientCo
}
if (topicType.equals(TOPIC_NAME.getValue())) {
for (final String topic : topicListing.split(",", 100)) {
for (final String topic : topicListing.split(",")) {
final String trimmedName = topic.trim();
if (!trimmedName.isEmpty()) {
topics.add(trimmedName);

View File

@ -22,12 +22,20 @@ import org.apache.nifi.kafka.shared.property.SaslMechanism;
import org.apache.nifi.kafka.shared.property.SecurityProtocol;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.kerberos.SelfContainedKerberosUserService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@ -44,6 +52,36 @@ public class TestConsumeKafka_2_6 {
mockConsumerPool = mock(ConsumerPool.class);
}
@Test
public void validateNoLimitToTopicCount() {
final int expectedCount = 101;
final String topics = String.join(",", Collections.nCopies(expectedCount, "foo"));
final ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6() {
protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
final ConsumerPool consumerPool = super.createConsumerPool(context, log);
try {
final Field topicsField = ConsumerPool.class.getDeclaredField("topics");
topicsField.setAccessible(true);
final Object o = topicsField.get(consumerPool);
final List<?> list = assertInstanceOf(List.class, o);
assertEquals(expectedCount, list.size());
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
return consumerPool;
}
};
TestRunner runner = TestRunners.newTestRunner(consumeKafka);
runner.setValidateExpressionUsage(false);
runner.setProperty(ConsumeKafka_2_6.BOOTSTRAP_SERVERS, "localhost:1234");
runner.setProperty(ConsumeKafka_2_6.TOPICS, topics);
runner.setProperty(ConsumeKafka_2_6.GROUP_ID, "foo");
runner.setProperty(ConsumeKafka_2_6.AUTO_OFFSET_RESET, ConsumeKafka_2_6.OFFSET_EARLIEST);
runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
runner.run();
}
@Test
public void validateCustomValidatorSettings() {
ConsumeKafka_2_6 consumeKafka = new ConsumeKafka_2_6();