NIFI-1684 This closes #308. fixed ZKClient connection leak

Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
Oleg Zhurakousky 2016-03-28 21:48:37 -04:00 committed by joewitt
parent 9912f18de5
commit c3d54ab724
2 changed files with 37 additions and 19 deletions

View File

@ -256,13 +256,14 @@ public class GetKafka extends AbstractProcessor {
props.setProperty("consumer.timeout.ms", "1");
}
int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
final ConsumerConfig consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
final Map<String, Integer> topicCountMap = new HashMap<>(1);
int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
int concurrentTaskToUse = context.getMaxConcurrentTasks();
if (context.getMaxConcurrentTasks() < partitionCount){
this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
@ -346,14 +347,14 @@ public class GetKafka extends AbstractProcessor {
try {
f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
this.consumerStreamsReady.set(false);
shutdownConsumer();
f.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while waiting to get connection", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
this.consumerStreamsReady.set(false);
shutdownConsumer();
f.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e);
}
@ -374,14 +375,14 @@ public class GetKafka extends AbstractProcessor {
try {
consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
this.consumerStreamsReady.set(false);
shutdownConsumer();
consumptionFuture.cancel(true);
Thread.currentThread().interrupt();
getLogger().warn("Interrupted while consuming messages", e);
} catch (ExecutionException e) {
throw new IllegalStateException(e);
} catch (TimeoutException e) {
this.consumerStreamsReady.set(false);
shutdownConsumer();
consumptionFuture.cancel(true);
getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e);
}

View File

@ -33,25 +33,42 @@ import scala.collection.JavaConversions;
*/
class KafkaUtils {
/**
* Will retrieve the amount of partitions for a given Kafka topic.
*/
static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) {
ZkClient zkClient = new ZkClient(zookeeperConnectionString);
ZkClient zkClient = null;
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return ZKStringSerializer.serialize(o);
}
try {
zkClient = new ZkClient(zookeeperConnectionString);
zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object o) throws ZkMarshallingError {
return ZKStringSerializer.serialize(o);
}
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return ZKStringSerializer.deserialize(bytes);
}
});
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
if (topicMetadatas != null && topicMetadatas.size() > 0) {
return JavaConversions.asJavaSet(topicMetadatas).iterator().next().partitionsMetadata().size();
} else {
throw new IllegalStateException("Failed to get metadata for topic " + topicName);
}
});
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
return topicMetadatas.size();
} catch (Exception e) {
throw new IllegalStateException("Failed to retrieve partitions for topic " + topicName, e);
} finally {
try {
zkClient.close();
} catch (Exception e2) {
// ignore
}
}
}
}