Merge pull request #2172 from gianm/remove-kafka-seven

Remove unused kafka-seven extension.
This commit is contained in:
Bingkun Guo 2015-12-29 15:19:28 -06:00
commit 492adeaaa7
5 changed files with 0 additions and 277 deletions

View File

@ -1,66 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-kafka-seven</artifactId>
<name>druid-kafka-seven</name>
<description>druid-kafka-seven</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.7.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
</dependency>
<dependency>
<!-- Kafka 0.7 is not on Maven Central, so you will have to
replace this with your own Kafka 0.7 artifact -->
<groupId>kafka</groupId>
<artifactId>core-kafka</artifactId>
<version>0.7.2</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,49 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.kafka;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
/**
*/
public class KafkaSevenDruidModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("KafkaSevenFirehoseModule")
.registerSubtypes(
new NamedType(KafkaSevenFirehoseFactory.class, "kafka-0.7.2")
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -1,158 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.firehose.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.metamx.common.logger.Logger;
import io.druid.data.input.ByteBufferInputRowParser;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
*/
public class KafkaSevenFirehoseFactory implements FirehoseFactory<ByteBufferInputRowParser>
{
private static final Logger log = new Logger(KafkaSevenFirehoseFactory.class);
private final Properties consumerProps;
private final String feed;
@JsonCreator
public KafkaSevenFirehoseFactory(
@JsonProperty("consumerProps") Properties consumerProps,
@JsonProperty("feed") String feed
)
{
this.consumerProps = consumerProps;
this.feed = feed;
}
@JsonProperty
public Properties getConsumerProps()
{
return consumerProps;
}
@JsonProperty
public String getFeed()
{
return feed;
}
@Override
public Firehose connect(final ByteBufferInputRowParser firehoseParser) throws IOException
{
Set<String> newDimExclus = Sets.union(
firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(),
Sets.newHashSet("feed")
);
final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec(
firehoseParser.getParseSpec()
.withDimensionsSpec(
firehoseParser.getParseSpec()
.getDimensionsSpec()
.withDimensionExclusions(
newDimExclus
)
)
);
final ConsumerConnector connector = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps));
final Map<String, List<KafkaStream<Message>>> streams = connector.createMessageStreams(ImmutableMap.of(feed, 1));
final List<KafkaStream<Message>> streamList = streams.get(feed);
if (streamList == null || streamList.size() != 1) {
return null;
}
final KafkaStream<Message> stream = streamList.get(0);
final Iterator<MessageAndMetadata<Message>> iter = stream.iterator();
return new Firehose()
{
@Override
public boolean hasMore()
{
return iter.hasNext();
}
@Override
public InputRow nextRow()
{
final Message message = iter.next().message();
if (message == null) {
return null;
}
return parseMessage(message);
}
public InputRow parseMessage(Message message)
{
return theParser.parse(message.payload());
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
/*
* This is actually not going to do exactly what we want, cause it
* will be called asynchronously after the persist is complete. So,
* it's going to commit that it's processed more than was actually
* persisted. This is unfortunate, but good enough for now. Should
* revisit along with an upgrade of our Kafka version.
*/
log.info("committing offsets");
connector.commitOffsets();
}
};
}
@Override
public void close() throws IOException
{
connector.shutdown();
}
};
}
}

View File

@ -1 +0,0 @@
io.druid.firehose.kafka.KafkaSevenDruidModule

View File

@ -92,9 +92,6 @@
<module>extensions/cassandra-storage</module> <module>extensions/cassandra-storage</module>
<module>extensions/hdfs-storage</module> <module>extensions/hdfs-storage</module>
<module>extensions/s3-extensions</module> <module>extensions/s3-extensions</module>
<!-- kafka-seven extension is not built by default since
Kafka 0.7 is not available in Maven Central -->
<!-- <module>extensions/kafka&#45;seven</module> -->
<module>extensions/kafka-eight</module> <module>extensions/kafka-eight</module>
<module>extensions/kafka-eight-simpleConsumer</module> <module>extensions/kafka-eight-simpleConsumer</module>
<module>extensions/rabbitmq</module> <module>extensions/rabbitmq</module>