From 4d7dace0333fac63a79dc0d42fa1ff9ccef82e43 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 10 Apr 2013 10:02:24 -0700 Subject: [PATCH] Realtime: Update to kafka-0.7.2 Firehose configs will need to have their "kafka-0.6.3" type changed to "kafka-0.7.2", and "/kafka" added to the zk.connect string in consumerProps (kafka-0.6-mmx11 hardcoded this path). --- realtime/pom.xml | 16 +--------------- .../metamx/druid/realtime/FirehoseFactory.java | 2 +- .../druid/realtime/KafkaFirehoseFactory.java | 13 +++++++------ 3 files changed, 9 insertions(+), 22 deletions(-) diff --git a/realtime/pom.xml b/realtime/pom.xml index 330439bcb4a..9576b3206ba 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -31,10 +31,6 @@ 0.3.35-SNAPSHOT - - 2.8.2 - - com.metamx.druid @@ -116,7 +112,7 @@ kafka core-kafka - 0.6-mmx11 + 0.7.2-mmx1 log4j @@ -124,16 +120,6 @@ - - org.scala-lang - scala-library - ${scala.version} - - - org.scala-lang - scala-compiler - ${scala.version} - com.github.sgroschupf zkclient diff --git a/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java index a3481f84ad8..4278f3222b9 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/FirehoseFactory.java @@ -28,7 +28,7 @@ import java.io.IOException; */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes({ - @JsonSubTypes.Type(name = "kafka-0.6.3", value = KafkaFirehoseFactory.class) + @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class) }) public interface FirehoseFactory { diff --git a/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java index bb60c3e829b..b5beed9a92e 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/KafkaFirehoseFactory.java @@ -29,9 +29,10 @@ import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.input.InputRow; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaMessageStream; +import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.Message; +import kafka.message.MessageAndMetadata; import java.io.IOException; import java.nio.CharBuffer; @@ -76,18 +77,18 @@ public class KafkaFirehoseFactory implements FirehoseFactory { final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); - final Map> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); + final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); - final List streamList = streams.get(feed); + final List> streamList = streams.get(feed); if (streamList == null || streamList.size() != 1) { return null; } - final KafkaMessageStream stream = streamList.get(0); + final KafkaStream stream = streamList.get(0); return new Firehose() { - Iterator iter = stream.iterator(); + Iterator> iter = stream.iterator(); private CharBuffer chars = null; @Override @@ -99,7 +100,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory @Override public InputRow nextRow() throws FormattedException { - final Message message = iter.next(); + final Message message = iter.next().message(); if (message == null) { return null;