mirror of https://github.com/apache/nifi.git
NIFI-1192: Removed some additional white space
This commit is contained in:
parent
657885e5ba
commit
4fa2a713a2
|
@ -223,8 +223,7 @@ public class GetKafka extends AbstractProcessor {
|
||||||
if (descriptor.isDynamic()) {
|
if (descriptor.isDynamic()) {
|
||||||
if (props.containsKey(descriptor.getName())) {
|
if (props.containsKey(descriptor.getName())) {
|
||||||
this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
|
this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
|
||||||
+ props.getProperty(descriptor.getName()) + "' with dynamically set value '"
|
+ props.getProperty(descriptor.getName()) + "' with dynamically set value '" + entry.getValue() + "'.");
|
||||||
+ entry.getValue() + "'.");
|
|
||||||
}
|
}
|
||||||
props.setProperty(descriptor.getName(), entry.getValue());
|
props.setProperty(descriptor.getName(), entry.getValue());
|
||||||
}
|
}
|
||||||
|
@ -251,8 +250,7 @@ public class GetKafka extends AbstractProcessor {
|
||||||
|
|
||||||
final Map<String, Integer> topicCountMap = new HashMap<>(1);
|
final Map<String, Integer> topicCountMap = new HashMap<>(1);
|
||||||
|
|
||||||
int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
|
int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
|
||||||
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
|
|
||||||
|
|
||||||
int concurrentTaskToUse = context.getMaxConcurrentTasks();
|
int concurrentTaskToUse = context.getMaxConcurrentTasks();
|
||||||
if (context.getMaxConcurrentTasks() < partitionCount){
|
if (context.getMaxConcurrentTasks() < partitionCount){
|
||||||
|
@ -263,8 +261,7 @@ public class GetKafka extends AbstractProcessor {
|
||||||
concurrentTaskToUse = partitionCount;
|
concurrentTaskToUse = partitionCount;
|
||||||
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
|
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
|
||||||
+ "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
|
+ "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
|
||||||
+ "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to "
|
+ "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events");
|
||||||
+ "consume events");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
topicCountMap.put(topic, concurrentTaskToUse);
|
topicCountMap.put(topic, concurrentTaskToUse);
|
||||||
|
|
Loading…
Reference in New Issue