diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index 8f8d2e96eb..5e7a7ae48d 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -223,8 +223,7 @@ public class GetKafka extends AbstractProcessor { if (descriptor.isDynamic()) { if (props.containsKey(descriptor.getName())) { this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" - + props.getProperty(descriptor.getName()) + "' with dynamically set value '" - + entry.getValue() + "'."); + + props.getProperty(descriptor.getName()) + "' with dynamically set value '" + entry.getValue() + "'."); } props.setProperty(descriptor.getName(), entry.getValue()); } @@ -251,20 +250,18 @@ public class GetKafka extends AbstractProcessor { final Map topicCountMap = new HashMap<>(1); - int partitionCount = KafkaUtils.retrievePartitionCountForTopic( - context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); + int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); int concurrentTaskToUse = context.getMaxConcurrentTasks(); if (context.getMaxConcurrentTasks() < partitionCount){ this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " - + "Consider making it equal to the amount of partition count for most efficient event consumption."); + + "Consider making it equal to the amount of partition count for most efficient event consumption."); } else if (context.getMaxConcurrentTasks() > partitionCount){ concurrentTaskToUse = partitionCount; this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " - + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to " - + "consume events"); + + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events"); } topicCountMap.put(topic, concurrentTaskToUse);