diff --git a/realtime/pom.xml b/realtime/pom.xml
index 330439bcb4a..9576b3206ba 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -31,10 +31,6 @@
0.3.35-SNAPSHOT
-
- 2.8.2
-
-
com.metamx.druid
@@ -116,7 +112,7 @@
kafka
core-kafka
- 0.6-mmx11
+ 0.7.2-mmx1
log4j
@@ -124,16 +120,6 @@
-
- org.scala-lang
- scala-library
- ${scala.version}
-
-
- org.scala-lang
- scala-compiler
- ${scala.version}
-
com.github.sgroschupf
zkclient
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java
index a3481f84ad8..4278f3222b9 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java
@@ -28,7 +28,7 @@ import java.io.IOException;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(name = "kafka-0.6.3", value = KafkaFirehoseFactory.class)
+ @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class)
})
public interface FirehoseFactory
{
diff --git a/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java
index bb60c3e829b..b5beed9a92e 100644
--- a/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java
+++ b/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java
@@ -29,9 +29,10 @@ import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaMessageStream;
+import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
+import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.nio.CharBuffer;
@@ -76,18 +77,18 @@ public class KafkaFirehoseFactory implements FirehoseFactory
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
- final Map> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
+ final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
- final List streamList = streams.get(feed);
+ final List> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
- final KafkaMessageStream stream = streamList.get(0);
+ final KafkaStream stream = streamList.get(0);
return new Firehose()
{
- Iterator iter = stream.iterator();
+ Iterator> iter = stream.iterator();
private CharBuffer chars = null;
@Override
@@ -99,7 +100,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory
@Override
public InputRow nextRow() throws FormattedException
{
- final Message message = iter.next();
+ final Message message = iter.next().message();
if (message == null) {
return null;