Merge pull request #120 from metamx/kafka-0.7.2

Realtime: Update to kafka-0.7.2
This commit is contained in:
cheddar 2013-04-12 11:55:11 -07:00
commit e9f6c7267d
3 changed files with 9 additions and 22 deletions

View File

@ -31,10 +31,6 @@
<version>0.3.35-SNAPSHOT</version>
</parent>
<properties>
<scala.version>2.8.2</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>com.metamx.druid</groupId>
@ -116,7 +112,7 @@
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.6-mmx11</version>
<version>0.7.2-mmx1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
@ -124,16 +120,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>com.github.sgroschupf</groupId>
<artifactId>zkclient</artifactId>

View File

@ -28,7 +28,7 @@ import java.io.IOException;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.6.3", value = KafkaFirehoseFactory.class)
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class)
})
public interface FirehoseFactory
{

View File

@ -29,9 +29,10 @@ import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaMessageStream;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.nio.CharBuffer;
@ -76,18 +77,18 @@ public class KafkaFirehoseFactory implements FirehoseFactory
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaMessageStream>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaMessageStream> streamList = streams.get(feed);
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final KafkaMessageStream stream = streamList.get(0);
final KafkaStream<Message> stream = streamList.get(0);
return new Firehose()
{
Iterator<Message> iter = stream.iterator();
Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
private CharBuffer chars = null;
@Override
@ -99,7 +100,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next();
final Message message = iter.next().message();
if (message == null) {
return null;