Merge branch 'master' into stop_task

This commit is contained in:
Fangjin Yang 2013-04-12 22:13:45 -07:00
commit 6ed0308c59
3 changed files with 9 additions and 22 deletions

View File

@ -31,10 +31,6 @@
<version>0.3.35-SNAPSHOT</version> <version>0.3.35-SNAPSHOT</version>
</parent> </parent>
<properties>
<scala.version>2.8.2</scala.version>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>com.metamx.druid</groupId> <groupId>com.metamx.druid</groupId>
@ -116,7 +112,7 @@
<dependency> <dependency>
<groupId>kafka</groupId> <groupId>kafka</groupId>
<artifactId>core-kafka</artifactId> <artifactId>core-kafka</artifactId>
<version>0.6-mmx11</version> <version>0.7.2-mmx1</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>log4j</groupId> <groupId>log4j</groupId>
@ -124,16 +120,6 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.github.sgroschupf</groupId> <groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId> <artifactId>zkclient</artifactId>

View File

@ -28,7 +28,7 @@ import java.io.IOException;
*/ */
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.6.3", value = KafkaFirehoseFactory.class) @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class)
}) })
public interface FirehoseFactory public interface FirehoseFactory
{ {

View File

@ -29,9 +29,10 @@ import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import kafka.consumer.Consumer; import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream; import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message; import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException; import java.io.IOException;
import java.nio.CharBuffer; import java.nio.CharBuffer;
@ -76,18 +77,18 @@ public class KafkaFirehoseFactory implements FirehoseFactory
{ {
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaMessageStream>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaMessageStream> streamList = streams.get(feed); final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) { if (streamList == null || streamList.size() != 1) {
return null; return null;
} }
final KafkaMessageStream stream = streamList.get(0); final KafkaStream<Message> stream = streamList.get(0);
return new Firehose() return new Firehose()
{ {
Iterator<Message> iter = stream.iterator(); Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
private CharBuffer chars = null; private CharBuffer chars = null;
@Override @Override
@ -99,7 +100,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory
@Override @Override
public InputRow nextRow() throws FormattedException public InputRow nextRow() throws FormattedException
{ {
final Message message = iter.next(); final Message message = iter.next().message();
if (message == null) { if (message == null) {
return null; return null;