NIFI-1218 upgraded Kafka to 0.9.0.0 client API Tested and validated that it is still compatible with 0.8.* Kafka brokers

Signed-off-by: jpercivall <joepercivall@yahoo.com>
This commit is contained in:
Oleg Zhurakousky 2015-12-16 12:49:44 -05:00 committed by jpercivall
parent b19ff7cf37
commit 37635232c7
4 changed files with 19 additions and 6 deletions

View File

@ -37,12 +37,12 @@
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId> <artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version> <version>0.9.0.0</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.kafka</groupId> <groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.1</artifactId> <artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version> <version>0.9.0.0</version>
<exclusions> <exclusions>
<!-- Transitive dependencies excluded because they are located <!-- Transitive dependencies excluded because they are located
in a legacy Maven repository, which Maven 3 doesn't support. --> in a legacy Maven repository, which Maven 3 doesn't support. -->

View File

@ -25,6 +25,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
import kafka.admin.AdminUtils; import kafka.admin.AdminUtils;
import kafka.api.TopicMetadata; import kafka.api.TopicMetadata;
import kafka.utils.ZKStringSerializer; import kafka.utils.ZKStringSerializer;
import kafka.utils.ZkUtils;
import scala.collection.JavaConversions; import scala.collection.JavaConversions;
/** /**
@ -50,7 +51,7 @@ class KafkaUtils {
} }
}); });
scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient); .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), ZkUtils.apply(zkClient, false));
return topicMetadatas.size(); return topicMetadatas.size();
} }
} }

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
@ -75,8 +76,6 @@ import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.LongHolder;
import scala.actors.threadpool.Arrays;
@SupportsBatching @SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED) @InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})

View File

@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.BufferExhaustedException; import org.apache.kafka.clients.producer.BufferExhaustedException;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
@ -474,6 +475,18 @@ public class TestPutKafka {
@Override @Override
public void close() { public void close() {
} }
@Override
public void close(long arg0, TimeUnit arg1) {
// TODO Auto-generated method stub
}
@Override
public void flush() {
// TODO Auto-generated method stub
}
} }
} }