From c3d54ab7246fba5ea1432d949c79f97f0e97f25c Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 28 Mar 2016 21:48:37 -0400 Subject: [PATCH] NIFI-1684 This closes #308. fixed ZKClient connection leak Signed-off-by: joewitt --- .../nifi/processors/kafka/GetKafka.java | 13 +++--- .../nifi/processors/kafka/KafkaUtils.java | 43 +++++++++++++------ 2 files changed, 37 insertions(+), 19 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java index e06befb504..76603059a2 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java @@ -256,13 +256,14 @@ public class GetKafka extends AbstractProcessor { props.setProperty("consumer.timeout.ms", "1"); } + int partitionCount = KafkaUtils.retrievePartitionCountForTopic( + context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); + final ConsumerConfig consumerConfig = new ConsumerConfig(props); consumer = Consumer.createJavaConsumerConnector(consumerConfig); final Map topicCountMap = new HashMap<>(1); - int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); - int concurrentTaskToUse = context.getMaxConcurrentTasks(); if (context.getMaxConcurrentTasks() < partitionCount){ this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " @@ -346,14 +347,14 @@ public class GetKafka extends AbstractProcessor { try { f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); f.cancel(true); Thread.currentThread().interrupt(); getLogger().warn("Interrupted while waiting to get connection", e); } catch (ExecutionException e) { throw new IllegalStateException(e); } catch (TimeoutException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); f.cancel(true); getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e); } @@ -374,14 +375,14 @@ public class GetKafka extends AbstractProcessor { try { consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); consumptionFuture.cancel(true); Thread.currentThread().interrupt(); getLogger().warn("Interrupted while consuming messages", e); } catch (ExecutionException e) { throw new IllegalStateException(e); } catch (TimeoutException e) { - this.consumerStreamsReady.set(false); + shutdownConsumer(); consumptionFuture.cancel(true); getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e); } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index a725c2b457..8ddea61907 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -33,25 +33,42 @@ import scala.collection.JavaConversions; */ class KafkaUtils { + /** * Will retrieve the amount of partitions for a given Kafka topic. */ static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { - ZkClient zkClient = new ZkClient(zookeeperConnectionString); + ZkClient zkClient = null; - zkClient.setZkSerializer(new ZkSerializer() { - @Override - public byte[] serialize(Object o) throws ZkMarshallingError { - return ZKStringSerializer.serialize(o); - } + try { + zkClient = new ZkClient(zookeeperConnectionString); + zkClient.setZkSerializer(new ZkSerializer() { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError { + return ZKStringSerializer.serialize(o); + } - @Override - public Object deserialize(byte[] bytes) throws ZkMarshallingError { - return ZKStringSerializer.deserialize(bytes); + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + return ZKStringSerializer.deserialize(bytes); + } + }); + scala.collection.Set topicMetadatas = AdminUtils + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + if (topicMetadatas != null && topicMetadatas.size() > 0) { + return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size(); + } else { + throw new IllegalStateException("Failed to get metadata for topic " + topicName); } - }); - scala.collection.Set topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); - return topicMetadatas.size(); + } catch (Exception e) { + throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e); + } finally { + try { + zkClient.close(); + } catch (Exception e2) { + // ignore + } + } } + }