mirror of https://github.com/apache/druid.git
Realtime: Update to kafka-0.7.2
Firehose configs will need to have their "kafka-0.6.3" type changed to "kafka-0.7.2", and "/kafka" added to the zk.connect string in consumerProps (kafka-0.6-mmx11 hardcoded this path).
This commit is contained in:
parent
55648c47a7
commit
4d7dace033
|
@ -31,10 +31,6 @@
|
|||
<version>0.3.35-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
<scala.version>2.8.2</scala.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
|
@ -116,7 +112,7 @@
|
|||
<dependency>
|
||||
<groupId>kafka</groupId>
|
||||
<artifactId>core-kafka</artifactId>
|
||||
<version>0.6-mmx11</version>
|
||||
<version>0.7.2-mmx1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>log4j</groupId>
|
||||
|
@ -124,16 +120,6 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-library</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.scala-lang</groupId>
|
||||
<artifactId>scala-compiler</artifactId>
|
||||
<version>${scala.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.github.sgroschupf</groupId>
|
||||
<artifactId>zkclient</artifactId>
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.io.IOException;
|
|||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes({
|
||||
@JsonSubTypes.Type(name = "kafka-0.6.3", value = KafkaFirehoseFactory.class)
|
||||
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class)
|
||||
})
|
||||
public interface FirehoseFactory
|
||||
{
|
||||
|
|
|
@ -29,9 +29,10 @@ import com.metamx.druid.indexer.data.StringInputRowParser;
|
|||
import com.metamx.druid.input.InputRow;
|
||||
import kafka.consumer.Consumer;
|
||||
import kafka.consumer.ConsumerConfig;
|
||||
import kafka.consumer.KafkaMessageStream;
|
||||
import kafka.consumer.KafkaStream;
|
||||
import kafka.javaapi.consumer.ConsumerConnector;
|
||||
import kafka.message.Message;
|
||||
import kafka.message.MessageAndMetadata;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.CharBuffer;
|
||||
|
@ -76,18 +77,18 @@ public class KafkaFirehoseFactory implements FirehoseFactory
|
|||
{
|
||||
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
|
||||
|
||||
final Map<String, List<KafkaMessageStream>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
|
||||
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
|
||||
|
||||
final List<KafkaMessageStream> streamList = streams.get(feed);
|
||||
final List<KafkaStream<Message>> streamList = streams.get(feed);
|
||||
if (streamList == null || streamList.size() != 1) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final KafkaMessageStream stream = streamList.get(0);
|
||||
final KafkaStream<Message> stream = streamList.get(0);
|
||||
|
||||
return new Firehose()
|
||||
{
|
||||
Iterator<Message> iter = stream.iterator();
|
||||
Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
|
||||
private CharBuffer chars = null;
|
||||
|
||||
@Override
|
||||
|
@ -99,7 +100,7 @@ public class KafkaFirehoseFactory implements FirehoseFactory
|
|||
@Override
|
||||
public InputRow nextRow() throws FormattedException
|
||||
{
|
||||
final Message message = iter.next();
|
||||
final Message message = iter.next().message();
|
||||
|
||||
if (message == null) {
|
||||
return null;
|
||||
|
|
Loading…
Reference in New Issue