From 133838a93fa5a5771e9812a14845003ad8782c96 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Tue, 12 Jan 2016 10:56:49 -0500 Subject: [PATCH] NIFI-1233 upgraded to Kafka 0.9.0.0 Signed-off-by: jpercivall --- .../nifi-kafka-bundle/nifi-kafka-processors/pom.xml | 6 +++--- .../org/apache/nifi/processors/kafka/KafkaUtils.java | 4 +++- .../org/apache/nifi/processors/kafka/PutKafka.java | 5 ++--- .../apache/nifi/processors/kafka/TestPutKafka.java | 11 +++++++++++ 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml index 2eee0506bd..fb59e693bc 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml @@ -37,12 +37,12 @@ org.apache.kafka kafka-clients - 0.8.2.2 + 0.9.0.0 org.apache.kafka - kafka_2.9.1 - 0.8.2.2 + kafka_2.10 + 0.9.0.0 diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java index 657d88b48e..d09ac4a1a7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java @@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; import kafka.api.TopicMetadata; import kafka.utils.ZKStringSerializer; +import kafka.utils.ZkUtils; import scala.collection.JavaConversions; /** @@ -38,6 +39,7 @@ class KafkaUtils { */ static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) { ZkClient zkClient = new ZkClient(zookeeperConnectionString); + zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object o) throws ZkMarshallingError { @@ -50,7 +52,7 @@ class KafkaUtils { } }); scala.collection.Set topicMetadatas = AdminUtils - .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); + .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false)); return topicMetadatas.size(); } } diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index b5766e4886..febb666d3a 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -20,14 +20,15 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; -import java.util.Map.Entry; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.util.LongHolder; -import scala.actors.threadpool.Arrays; - @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java index 17d1cc831a..e12ec2aeb4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.Callback; @@ -474,6 +475,16 @@ public class TestPutKafka { @Override public void close() { } + + @Override + public void close(long arg0, TimeUnit arg1) { + // ignore, not used in test + } + + @Override + public void flush() { + // ignore, not used in test + } } }