diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml new file mode 100644 index 00000000000..6a2a74ccd70 --- /dev/null +++ b/kafka-seven/pom.xml @@ -0,0 +1,59 @@ + + + + + 4.0.0 + io.druid.extensions + druid-kafka-seven + druid-kafka-seven + druid-kafka-seven + + + io.druid + druid + 0.6.11-SNAPSHOT + + + + + io.druid + druid-api + + + kafka + core-kafka + 0.7.2-mmx1 + + + log4j + log4j + + + + + + + junit + junit + test + + + diff --git a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenDruidModule.java b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenDruidModule.java new file mode 100644 index 00000000000..11682a669aa --- /dev/null +++ b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenDruidModule.java @@ -0,0 +1,51 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.firehose.kafka; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +/** + */ +public class KafkaSevenDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("KafkaSevenFirehoseModule") + .registerSubtypes( + new NamedType(KafkaSevenFirehoseFactory.class, "kafka-0.7.2") + ) + ); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java new file mode 100644 index 00000000000..8f200d1cdbc --- /dev/null +++ b/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java @@ -0,0 +1,155 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.firehose.kafka; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import com.metamx.common.exception.FormattedException; +import com.metamx.common.logger.Logger; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; +import io.druid.data.input.InputRow; +import kafka.consumer.Consumer; +import kafka.consumer.ConsumerConfig; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import kafka.message.Message; +import kafka.message.MessageAndMetadata; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +/** + */ +public class KafkaSevenFirehoseFactory implements FirehoseFactory +{ + private static final Logger log = new Logger(KafkaSevenFirehoseFactory.class); + + private final Properties consumerProps; + private final String feed; + private final ByteBufferInputRowParser parser; + + @JsonCreator + public KafkaSevenFirehoseFactory( + @JsonProperty("consumerProps") Properties consumerProps, + @JsonProperty("feed") String feed, + @JsonProperty("parser") ByteBufferInputRowParser parser + ) + { + this.consumerProps = consumerProps; + this.feed = feed; + this.parser = parser; + + parser.addDimensionExclusion("feed"); + } + + @JsonProperty + public Properties getConsumerProps() + { + return consumerProps; + } + + @JsonProperty + public String getFeed() + { + return feed; + } + + @JsonProperty + public ByteBufferInputRowParser getParser() + { + return parser; + } + + @Override + public Firehose connect() throws IOException + { + final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); + + final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); + + final List> streamList = streams.get(feed); + if (streamList == null || streamList.size() != 1) { + return null; + } + + final KafkaStream stream = streamList.get(0); + final Iterator> iter = stream.iterator(); + + return new Firehose() + { + @Override + public boolean hasMore() + { + return iter.hasNext(); + } + + @Override + public InputRow nextRow() throws FormattedException + { + final Message message = iter.next().message(); + + if (message == null) { + return null; + } + + return parseMessage(message); + } + + public InputRow parseMessage(Message message) throws FormattedException + { + return parser.parse(message.payload()); + } + + @Override + public Runnable commit() + { + return new Runnable() + { + @Override + public void run() + { + /* + * This is actually not going to do exactly what we want, cause it + * will be called asynchronously after the persist is complete. So, + * it's going to commit that it's processed more than was actually + * persisted. This is unfortunate, but good enough for now. Should + * revisit along with an upgrade of our Kafka version. + */ + + log.info("committing offsets"); + connector.commitOffsets(); + } + }; + } + + @Override + public void close() throws IOException + { + connector.shutdown(); + } + }; + } +} diff --git a/kafka-seven/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/kafka-seven/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100644 index 00000000000..04c38658083 --- /dev/null +++ b/kafka-seven/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.firehose.kafka.KafkaSevenDruidModule \ No newline at end of file diff --git a/pom.xml b/pom.xml index 45e1c5d7f5e..72b65ff271d 100644 --- a/pom.xml +++ b/pom.xml @@ -48,12 +48,14 @@ examples indexing-hadoop indexing-service + processing server services - processing + cassandra-storage hdfs-storage s3-extensions + kafka-seven @@ -365,17 +367,6 @@ aether-api 0.9.0.M2 - - kafka - core-kafka - 0.7.2-mmx1 - - - log4j - log4j - - - com.rabbitmq amqp-client diff --git a/s3-extensions/src/main/java/io/druid/firehose/s3/S3FirehoseDruidModule.java b/s3-extensions/src/main/java/io/druid/firehose/s3/S3FirehoseDruidModule.java new file mode 100644 index 00000000000..d6ce7f28548 --- /dev/null +++ b/s3-extensions/src/main/java/io/druid/firehose/s3/S3FirehoseDruidModule.java @@ -0,0 +1,48 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.firehose.s3; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.List; + +/** + */ +public class S3FirehoseDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule().registerSubtypes(new NamedType(StaticS3FirehoseFactory.class, "static-s3")) + ); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/s3-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/s3-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule index b060044ef94..2900ba1cbf3 100644 --- a/s3-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ b/s3-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -1 +1,2 @@ -io.druid.storage.s3.S3StorageDruidModule \ No newline at end of file +io.druid.storage.s3.S3StorageDruidModule +io.druid.firehose.s3.S3FirehoseDruidModule \ No newline at end of file diff --git a/server/pom.xml b/server/pom.xml index 51722713646..a17e1bbef30 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -183,10 +183,6 @@ org.eclipse.jetty jetty-servlets - - kafka - core-kafka - com.rabbitmq amqp-client diff --git a/server/src/main/java/io/druid/guice/FirehoseModule.java b/server/src/main/java/io/druid/guice/FirehoseModule.java index acaa370a18d..8a7480fd22c 100644 --- a/server/src/main/java/io/druid/guice/FirehoseModule.java +++ b/server/src/main/java/io/druid/guice/FirehoseModule.java @@ -27,7 +27,6 @@ import io.druid.data.input.ProtoBufInputRowParser; import io.druid.initialization.DruidModule; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.IrcFirehoseFactory; -import io.druid.segment.realtime.firehose.KafkaFirehoseFactory; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.firehose.RabbitMQFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; @@ -50,7 +49,6 @@ public class FirehoseModule implements DruidModule return Arrays.asList( new SimpleModule("FirehoseModule") .registerSubtypes( - new NamedType(KafkaFirehoseFactory.class, "kafka-0.7.2"), new NamedType(RabbitMQFirehoseFactory.class, "rabbitmq"), new NamedType(ClippedFirehoseFactory.class, "clipped"), new NamedType(TimedShutoffFirehoseFactory.class, "timed"), diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/KafkaFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/KafkaFirehoseFactory.java deleted file mode 100644 index 5fa1b252661..00000000000 --- a/server/src/main/java/io/druid/segment/realtime/firehose/KafkaFirehoseFactory.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012, 2013 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package io.druid.segment.realtime.firehose; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableMap; -import com.metamx.common.exception.FormattedException; -import com.metamx.common.logger.Logger; -import io.druid.data.input.ByteBufferInputRowParser; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; -import kafka.message.MessageAndMetadata; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - */ -public class KafkaFirehoseFactory implements FirehoseFactory -{ - private static final Logger log = new Logger(KafkaFirehoseFactory.class); - - @JsonProperty - private final Properties consumerProps; - - @JsonProperty - private final String feed; - - @JsonProperty - private final ByteBufferInputRowParser parser; - - @JsonCreator - public KafkaFirehoseFactory( - @JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("feed") String feed, - @JsonProperty("parser") ByteBufferInputRowParser parser) - { - this.consumerProps = consumerProps; - this.feed = feed; - this.parser = parser; - - parser.addDimensionExclusion("feed"); - } - - @Override - public Firehose connect() throws IOException - { - final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); - - final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); - - final List> streamList = streams.get(feed); - if (streamList == null || streamList.size() != 1) - { - return null; - } - - final KafkaStream stream = streamList.get(0); - - return new DefaultFirehose(connector, stream, parser); - } - - private static class DefaultFirehose implements Firehose - { - private final ConsumerConnector connector; - private final Iterator> iter; - private final ByteBufferInputRowParser parser; - - public DefaultFirehose(ConsumerConnector connector, KafkaStream stream, ByteBufferInputRowParser parser) - { - iter = stream.iterator(); - this.connector = connector; - this.parser = parser; - } - - @Override - public boolean hasMore() - { - return iter.hasNext(); - } - - @Override - public InputRow nextRow() throws FormattedException - { - final Message message = iter.next().message(); - - if (message == null) - { - return null; - } - - return parseMessage(message); - } - - public InputRow parseMessage(Message message) throws FormattedException - { - return parser.parse(message.payload()); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - /* - * This is actually not going to do exactly what we want, cause it - * will be called asynchronously after the persist is complete. So, - * it's going to commit that it's processed more than was actually - * persisted. This is unfortunate, but good enough for now. Should - * revisit along with an upgrade of our Kafka version. - */ - - log.info("committing offsets"); - connector.commitOffsets(); - } - }; - } - - @Override - public void close() throws IOException - { - connector.shutdown(); - } - } -}