doesnt work yet

This commit is contained in:
fjy 2013-11-11 14:04:03 -08:00
parent 0681fb614d
commit 51aa7a2284
10 changed files with 319 additions and 174 deletions

59
kafka-seven/pom.xml Normal file
View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012, 2013 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-seven</artifactId>
<name>druid-kafka-seven</name>
<description>druid-kafka-seven</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.11-SNAPSHOT</version>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.7.2-mmx1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.firehose.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
/**
*/
public class KafkaSevenDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("KafkaSevenFirehoseModule")
.registerSubtypes(
new NamedType(KafkaSevenFirehoseFactory.class, "kafka-0.7.2")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,155 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
public class KafkaSevenFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(KafkaSevenFirehoseFactory.class);
private final Properties consumerProps;
private final String feed;
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaSevenFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") ByteBufferInputRowParser parser
)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
parser.addDimensionExclusion("feed");
}
@JsonProperty
public Properties getConsumerProps()
{
return consumerProps;
}
@JsonProperty
public String getFeed()
{
return feed;
}
@JsonProperty
public ByteBufferInputRowParser getParser()
{
return parser;
}
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final KafkaStream<Message> stream = streamList.get(0);
final Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return iter.hasNext();
}
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
if (message == null) {
return null;
}
return parseMessage(message);
}
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(message.payload());
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
* This is actually not going to do exactly what we want, cause it
* will be called asynchronously after the persist is complete. So,
* it's going to commit that it's processed more than was actually
* persisted. This is unfortunate, but good enough for now. Should
* revisit along with an upgrade of our Kafka version.
*/
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public void close() throws IOException
{
connector.shutdown();
}
};
}
}

View File

@ -0,0 +1 @@
io.druid.firehose.kafka.KafkaSevenDruidModule

15
pom.xml
View File

@ -48,12 +48,14 @@
<module>examples</module>
<module>indexing-hadoop</module>
<module>indexing-service</module>
<module>processing</module>
<module>server</module>
<module>services</module>
<module>processing</module>
<!-- Non-default modules -->
<module>cassandra-storage</module>
<module>hdfs-storage</module>
<module>s3-extensions</module>
<module>kafka-seven</module>
</modules>
<dependencyManagement>
@ -365,17 +367,6 @@
<artifactId>aether-api</artifactId>
<version>0.9.0.M2</version>
</dependency>
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.7.2-mmx1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.firehose.s3;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
/**
*/
public class S3FirehoseDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule().registerSubtypes(new NamedType(StaticS3FirehoseFactory.class, "static-s3"))
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -1 +1,2 @@
io.druid.storage.s3.S3StorageDruidModule
io.druid.storage.s3.S3StorageDruidModule
io.druid.firehose.s3.S3FirehoseDruidModule

View File

@ -183,10 +183,6 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlets</artifactId>
</dependency>
<dependency>
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>

View File

@ -27,7 +27,6 @@ import io.druid.data.input.ProtoBufInputRowParser;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.IrcFirehoseFactory;
import io.druid.segment.realtime.firehose.KafkaFirehoseFactory;
import io.druid.segment.realtime.firehose.LocalFirehoseFactory;
import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
@ -50,7 +49,6 @@ public class FirehoseModule implements DruidModule
return Arrays.<Module>asList(
new SimpleModule("FirehoseModule")
.registerSubtypes(
new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"),
new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"),
new NamedType(ClippedFirehoseFactory.class, "clipped"),
new NamedType(TimedShutoffFirehoseFactory.class, "timed"),

View File

@ -1,155 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.segment.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.exception.FormattedException;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
*/
public class KafkaFirehoseFactory implements FirehoseFactory
{
private static final Logger log = new Logger(KafkaFirehoseFactory.class);
@JsonProperty
private final Properties consumerProps;
@JsonProperty
private final String feed;
@JsonProperty
private final ByteBufferInputRowParser parser;
@JsonCreator
public KafkaFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed,
@JsonProperty("parser") ByteBufferInputRowParser parser)
{
this.consumerProps = consumerProps;
this.feed = feed;
this.parser = parser;
parser.addDimensionExclusion("feed");
}
@Override
public Firehose connect() throws IOException
{
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1)
{
return null;
}
final KafkaStream<Message> stream = streamList.get(0);
return new DefaultFirehose(connector, stream, parser);
}
private static class DefaultFirehose implements Firehose
{
private final ConsumerConnector connector;
private final Iterator<MessageAndMetadata<Message>> iter;
private final ByteBufferInputRowParser parser;
public DefaultFirehose(ConsumerConnector connector, KafkaStream<Message> stream, ByteBufferInputRowParser parser)
{
iter = stream.iterator();
this.connector = connector;
this.parser = parser;
}
@Override
public boolean hasMore()
{
return iter.hasNext();
}
@Override
public InputRow nextRow() throws FormattedException
{
final Message message = iter.next().message();
if (message == null)
{
return null;
}
return parseMessage(message);
}
public InputRow parseMessage(Message message) throws FormattedException
{
return parser.parse(message.payload());
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
* This is actually not going to do exactly what we want, cause it
* will be called asynchronously after the persist is complete. So,
* it's going to commit that it's processed more than was actually
* persisted. This is unfortunate, but good enough for now. Should
* revisit along with an upgrade of our Kafka version.
*/
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public void close() throws IOException
{
connector.shutdown();
}
}
}