Merge pull request #2369 from du00cs/master

[Feature] Extension: Offline Ingestion with limited Parquet Support
This commit is contained in:
Fangjin Yang 2016-03-29 23:19:35 -07:00
commit a8b28879f1
14 changed files with 672 additions and 0 deletions

View File

@ -0,0 +1,90 @@
# Parquet
This extension enables Druid to ingest and understand the Apache Parquet data format offline.
## Parquet Hadoop Parser
This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"io.druid.data.input.parquet.DruidParquetInputFormat"`. Make sure also to include "io.druid.extensions:druid-avro-extensions" as an extension.
Field | Type | Description | Required
----------|-------------|----------------------------------------------------------------------------------------|---------
type | String | This should say `parquet` | yes
parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes
For example:
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "no_metrics"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "tmp/segments"
},
"dataSchema": {
"dataSource": "no_metrics",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"name"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "ALL",
"intervals": ["2015-12-31/2016-01-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {},
"leaveIntermediate": true
}
}
}
```
Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.
It is no need to make your cluster to update to SNAPSHOT, you can just fire a hadoop job with your local compiled jars like:
```bash
HADOOP_CLASS_PATH=`hadoop classpath | sed s/*.jar/*/g`
java -Xmx32m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \
-classpath config/overlord:config/_common:lib/*:$HADOOP_CLASS_PATH:extensions/druid-avro-extensions/* \
io.druid.cli.Main index hadoop \
wikipedia_hadoop_parquet_job.json
```

View File

@ -45,6 +45,7 @@ If you'd like to take on maintenance for a community extension, please post on [
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
|druid-parquet-extensions|Support for data in Apache Parquet data format.|[link](../development/extensions-contrib/parquet.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|

View File

@ -0,0 +1,64 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "no_metrics"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "tmp/segments"
},
"dataSchema": {
"dataSource": "no_metrics",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"name"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "ALL",
"intervals": ["2015-12-31/2016-01-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -0,0 +1,75 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "wikipedia.gz.parquet"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [{
"type": "count",
"name": "count"
}, {
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
}, {
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-30/2013-09-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -0,0 +1,58 @@
{
"type" : "index_hadoop",
"spec" : {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
"paths": "wikipedia_list.parquet"
}
},
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "parquet",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2013-08-31/2013-09-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"partitionsSpec": {
"targetPartitionSize": 5000000
}
}
}
}

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.9.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>druid-parquet-extensions</artifactId>
<dependencies>
<dependency>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-avro-extensions</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,31 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.parquet;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.avro.DruidParquetReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
public class DruidParquetInputFormat extends ParquetInputFormat<GenericRecord>
{
public DruidParquetInputFormat()
{
super(DruidParquetReadSupport.class);
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.parquet;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
public class ParquetExtensionsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.asList(
new SimpleModule("ParuqetInputRowParserModule")
.registerSubtypes(
new NamedType(ParquetHadoopInputRowParser.class, "parquet")
)
);
}
@Override
public void configure(Binder binder)
{ }
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.parquet;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import io.druid.data.input.AvroStreamInputRowParser;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.avro.GenericRecordAsMap;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.apache.avro.generic.GenericRecord;
import org.joda.time.DateTime;
import java.util.List;
public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord>
{
private final ParseSpec parseSpec;
private final List<String> dimensions;
@JsonCreator
public ParquetHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec
)
{
this.parseSpec = parseSpec;
List<DimensionSchema> dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
this.dimensions = Lists.newArrayList();
for (DimensionSchema dim : dimensionSchema) {
this.dimensions.add(dim.getName());
}
}
/**
* imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean)}
*/
@Override
public InputRow parse(GenericRecord record)
{
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
}
@JsonProperty
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new ParquetHadoopInputRowParser(parseSpec);
}
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.avro;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.druid.data.input.impl.DimensionSchema;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DruidParquetReadSupport extends AvroReadSupport<GenericRecord>
{
private MessageType getPartialReadSchema(InitContext context)
{
MessageType fullSchema = context.getFileSchema();
String name = fullSchema.getName();
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
String tsField = config.getParser().getParseSpec().getTimestampSpec().getTimestampColumn();
List<DimensionSchema> dimensionSchema = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
Set<String> dimensions = Sets.newHashSet();
for (DimensionSchema dim : dimensionSchema) {
dimensions.add(dim.getName());
}
Set<String> metricsFields = Sets.newHashSet();
for (AggregatorFactory agg : config.getSchema().getDataSchema().getAggregators()) {
metricsFields.addAll(agg.requiredFields());
}
List<Type> partialFields = Lists.newArrayList();
for (Type type : fullSchema.getFields()) {
if (tsField.equals(type.getName())
|| metricsFields.contains(type.getName())
|| dimensions.size() > 0 && dimensions.contains(type.getName())
|| dimensions.size() == 0) {
partialFields.add(type);
}
}
return new MessageType(name, partialFields);
}
public ReadContext init(InitContext context)
{
MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), getPartialReadSchema(context));
return new ReadContext(requestedProjection);
}
@Override
public RecordMaterializer<GenericRecord> prepareForRead(
Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema, ReadContext readContext
)
{
MessageType parquetSchema = readContext.getRequestedSchema();
Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
Class<? extends AvroDataSupplier> suppClass = configuration.getClass(
AVRO_DATA_SUPPLIER,
SpecificDataSupplier.class,
AvroDataSupplier.class
);
AvroDataSupplier supplier = ReflectionUtils.newInstance(suppClass, configuration);
return new AvroRecordMaterializer<GenericRecord>(parquetSchema, avroSchema, supplier.get());
}
}

View File

@ -0,0 +1 @@
io.druid.data.input.parquet.ParquetExtensionsModule

View File

@ -0,0 +1,76 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.parquet;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
public class DruidParquetInputFormatTest
{
@Test
public void test() throws IOException, InterruptedException
{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
"example/wikipedia_hadoop_parquet_job.json"));
config.intoConfiguration(job);
File testFile = new File("example/wikipedia_list.parquet");
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration());
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
reader.initialize(split, context);
reader.nextKeyValue();
GenericRecord data = (GenericRecord) reader.getCurrentValue();
// field not read, should return null
assertEquals(data.get("added"), null);
assertEquals(data.get("page"), new Utf8("Gypsy Danger"));
reader.close();
}
}

View File

@ -102,6 +102,7 @@
<module>extensions-contrib/kafka-eight-simpleConsumer</module>
<module>extensions-contrib/rabbitmq</module>
<module>extensions-contrib/distinctcount</module>
<module>extensions-contrib/parquet-extensions</module>
<!-- distribution packaging -->
<module>distribution</module>