diff --git a/extensions/kafka-seven/pom.xml b/extensions/kafka-seven/pom.xml deleted file mode 100644 index 26a595e0127..00000000000 --- a/extensions/kafka-seven/pom.xml +++ /dev/null @@ -1,66 +0,0 @@ - - - - - 4.0.0 - - io.druid.extensions - druid-kafka-seven - druid-kafka-seven - druid-kafka-seven - - - io.druid - druid - 0.7.1-SNAPSHOT - ../../pom.xml - - - - - io.druid - druid-api - - - - kafka - core-kafka - 0.7.2 - - - log4j - log4j - - - org.jboss.netty - netty - - - - - - - junit - junit - test - - - - diff --git a/extensions/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenDruidModule.java b/extensions/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenDruidModule.java deleted file mode 100644 index d0f7daeed56..00000000000 --- a/extensions/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenDruidModule.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.firehose.kafka; - -import com.fasterxml.jackson.databind.Module; -import com.fasterxml.jackson.databind.jsontype.NamedType; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import io.druid.initialization.DruidModule; - -import java.util.List; - -/** - */ -public class KafkaSevenDruidModule implements DruidModule -{ - @Override - public List getJacksonModules() - { - return ImmutableList.of( - new SimpleModule("KafkaSevenFirehoseModule") - .registerSubtypes( - new NamedType(KafkaSevenFirehoseFactory.class, "kafka-0.7.2") - ) - ); - } - - @Override - public void configure(Binder binder) - { - - } -} diff --git a/extensions/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java b/extensions/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java deleted file mode 100644 index 8b31fc7c7df..00000000000 --- a/extensions/kafka-seven/src/main/java/io/druid/firehose/kafka/KafkaSevenFirehoseFactory.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.firehose.kafka; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Sets; -import com.metamx.common.logger.Logger; -import io.druid.data.input.ByteBufferInputRowParser; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.InputRow; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.Message; -import kafka.message.MessageAndMetadata; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - -/** - */ -public class KafkaSevenFirehoseFactory implements FirehoseFactory -{ - private static final Logger log = new Logger(KafkaSevenFirehoseFactory.class); - - private final Properties consumerProps; - private final String feed; - - @JsonCreator - public KafkaSevenFirehoseFactory( - @JsonProperty("consumerProps") Properties consumerProps, - @JsonProperty("feed") String feed - ) - { - this.consumerProps = consumerProps; - this.feed = feed; - } - - @JsonProperty - public Properties getConsumerProps() - { - return consumerProps; - } - - @JsonProperty - public String getFeed() - { - return feed; - } - - @Override - public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException - { - Set newDimExclus = Sets.union( - firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), - Sets.newHashSet("feed") - ); - final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec( - firehoseParser.getParseSpec() - .withDimensionsSpec( - firehoseParser.getParseSpec() - .getDimensionsSpec() - .withDimensionExclusions( - newDimExclus - ) - ) - ); - - final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)); - - final Map>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1)); - - final List> streamList = streams.get(feed); - if (streamList == null || streamList.size() != 1) { - return null; - } - - final KafkaStream stream = streamList.get(0); - final Iterator> iter = stream.iterator(); - - return new Firehose() - { - @Override - public boolean hasMore() - { - return iter.hasNext(); - } - - @Override - public InputRow nextRow() - { - final Message message = iter.next().message(); - - if (message == null) { - return null; - } - - return parseMessage(message); - } - - public InputRow parseMessage(Message message) - { - return theParser.parse(message.payload()); - } - - @Override - public Runnable commit() - { - return new Runnable() - { - @Override - public void run() - { - /* - * This is actually not going to do exactly what we want, cause it - * will be called asynchronously after the persist is complete. So, - * it's going to commit that it's processed more than was actually - * persisted. This is unfortunate, but good enough for now. Should - * revisit along with an upgrade of our Kafka version. - */ - - log.info("committing offsets"); - connector.commitOffsets(); - } - }; - } - - @Override - public void close() throws IOException - { - connector.shutdown(); - } - }; - } -} diff --git a/extensions/kafka-seven/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/kafka-seven/src/main/resources/META-INF/services/io.druid.initialization.DruidModule deleted file mode 100644 index 04c38658083..00000000000 --- a/extensions/kafka-seven/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ /dev/null @@ -1 +0,0 @@ -io.druid.firehose.kafka.KafkaSevenDruidModule \ No newline at end of file diff --git a/pom.xml b/pom.xml index c63b2cc6c48..73456ef8dda 100644 --- a/pom.xml +++ b/pom.xml @@ -92,9 +92,6 @@ extensions/cassandra-storage extensions/hdfs-storage extensions/s3-extensions - - extensions/kafka-eight extensions/kafka-eight-simpleConsumer extensions/rabbitmq