mirror of https://github.com/apache/druid.git
reoganize code folder according to recent upstream folder changes, seperate it from avro code and take it into extensions-conrib. docs rewite too
This commit is contained in:
parent
95733a362f
commit
0f67ff7dfb
|
@ -0,0 +1,90 @@
|
||||||
|
# Parquet
|
||||||
|
|
||||||
|
This extension enables Druid to ingest and understand the Apache Parquet data format offline.
|
||||||
|
|
||||||
|
## Parquet Hadoop Parser
|
||||||
|
|
||||||
|
This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"io.druid.data.input.parquet.DruidParquetInputFormat"`. Make sure also to include "io.druid.extensions:druid-avro-extensions" as an extension.
|
||||||
|
|
||||||
|
Field | Type | Description | Required
|
||||||
|
----------|-------------|----------------------------------------------------------------------------------------|---------
|
||||||
|
type | String | This should say `parquet` | yes
|
||||||
|
parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes
|
||||||
|
|
||||||
|
For example:
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"type": "index_hadoop",
|
||||||
|
"spec": {
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"inputSpec": {
|
||||||
|
"type": "static",
|
||||||
|
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
|
||||||
|
"paths": "no_metrics"
|
||||||
|
},
|
||||||
|
"metadataUpdateSpec": {
|
||||||
|
"type": "postgresql",
|
||||||
|
"connectURI": "jdbc:postgresql://localhost/druid",
|
||||||
|
"user" : "druid",
|
||||||
|
"password" : "asdf",
|
||||||
|
"segmentTable": "druid_segments"
|
||||||
|
},
|
||||||
|
"segmentOutputPath": "tmp/segments"
|
||||||
|
},
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "no_metrics",
|
||||||
|
"parser": {
|
||||||
|
"type": "parquet",
|
||||||
|
"parseSpec": {
|
||||||
|
"format": "timeAndDims",
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "time",
|
||||||
|
"format": "auto"
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"name"
|
||||||
|
],
|
||||||
|
"dimensionExclusions": [],
|
||||||
|
"spatialDimensions": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"metricsSpec": [{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
}],
|
||||||
|
"granularitySpec": {
|
||||||
|
"type": "uniform",
|
||||||
|
"segmentGranularity": "DAY",
|
||||||
|
"queryGranularity": "ALL",
|
||||||
|
"intervals": ["2015-12-31/2016-01-02"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"workingPath": "tmp/working_path",
|
||||||
|
"partitionsSpec": {
|
||||||
|
"targetPartitionSize": 5000000
|
||||||
|
},
|
||||||
|
"jobProperties" : {},
|
||||||
|
"leaveIntermediate": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.
|
||||||
|
|
||||||
|
It is no need to make your cluster to update to SNAPSHOT, you can just fire a hadoop job with your local compiled jars like:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
HADOOP_CLASS_PATH=`hadoop classpath | sed s/*.jar/*/g`
|
||||||
|
|
||||||
|
java -Xmx32m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \
|
||||||
|
-classpath config/overlord:config/_common:lib/*:$HADOOP_CLASS_PATH:extensions/druid-avro-extensions/* \
|
||||||
|
io.druid.cli.Main index hadoop \
|
||||||
|
wikipedia_hadoop_parquet_job.json
|
||||||
|
```
|
|
@ -45,6 +45,7 @@ If you'd like to take on maintenance for a community extension, please post on [
|
||||||
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|
||||||
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|
||||||
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
|
||||||
|
|druid-parquet-extensions|Support for data in Apache Parquet data format.|[link](../development/extensions-contrib/parquet.html)|
|
||||||
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|
||||||
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|
||||||
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
{
|
||||||
|
"type": "index_hadoop",
|
||||||
|
"spec": {
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"inputSpec": {
|
||||||
|
"type": "static",
|
||||||
|
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
|
||||||
|
"paths": "no_metrics"
|
||||||
|
},
|
||||||
|
"metadataUpdateSpec": {
|
||||||
|
"type": "postgresql",
|
||||||
|
"connectURI": "jdbc:postgresql://localhost/druid",
|
||||||
|
"user" : "druid",
|
||||||
|
"password" : "asdf",
|
||||||
|
"segmentTable": "druid_segments"
|
||||||
|
},
|
||||||
|
"segmentOutputPath": "tmp/segments"
|
||||||
|
},
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "no_metrics",
|
||||||
|
"parser": {
|
||||||
|
"type": "parquet",
|
||||||
|
"parseSpec": {
|
||||||
|
"format": "timeAndDims",
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "time",
|
||||||
|
"format": "auto"
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"name"
|
||||||
|
],
|
||||||
|
"dimensionExclusions": [],
|
||||||
|
"spatialDimensions": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"metricsSpec": [{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
}],
|
||||||
|
"granularitySpec": {
|
||||||
|
"type": "uniform",
|
||||||
|
"segmentGranularity": "DAY",
|
||||||
|
"queryGranularity": "ALL",
|
||||||
|
"intervals": ["2015-12-31/2016-01-02"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"workingPath": "tmp/working_path",
|
||||||
|
"partitionsSpec": {
|
||||||
|
"targetPartitionSize": 5000000
|
||||||
|
},
|
||||||
|
"jobProperties" : {
|
||||||
|
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
|
||||||
|
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
|
||||||
|
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
|
||||||
|
},
|
||||||
|
"leaveIntermediate": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
{
|
||||||
|
"type": "index_hadoop",
|
||||||
|
"spec": {
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"inputSpec": {
|
||||||
|
"type": "static",
|
||||||
|
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
|
||||||
|
"paths": "wikipedia.gz.parquet"
|
||||||
|
},
|
||||||
|
"metadataUpdateSpec": {
|
||||||
|
"type": "postgresql",
|
||||||
|
"connectURI": "jdbc:postgresql://localhost/druid",
|
||||||
|
"user" : "druid",
|
||||||
|
"password" : "asdf",
|
||||||
|
"segmentTable": "druid_segments"
|
||||||
|
},
|
||||||
|
"segmentOutputPath": "/tmp/segments"
|
||||||
|
},
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "wikipedia",
|
||||||
|
"parser": {
|
||||||
|
"type": "parquet",
|
||||||
|
"parseSpec": {
|
||||||
|
"format": "timeAndDims",
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "timestamp",
|
||||||
|
"format": "auto"
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"page",
|
||||||
|
"language",
|
||||||
|
"user",
|
||||||
|
"unpatrolled"
|
||||||
|
],
|
||||||
|
"dimensionExclusions": [],
|
||||||
|
"spatialDimensions": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"metricsSpec": [{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
}, {
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "deleted",
|
||||||
|
"fieldName": "deleted"
|
||||||
|
}, {
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "delta",
|
||||||
|
"fieldName": "delta"
|
||||||
|
}],
|
||||||
|
"granularitySpec": {
|
||||||
|
"type": "uniform",
|
||||||
|
"segmentGranularity": "DAY",
|
||||||
|
"queryGranularity": "NONE",
|
||||||
|
"intervals": ["2013-08-30/2013-09-02"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"workingPath": "tmp/working_path",
|
||||||
|
"partitionsSpec": {
|
||||||
|
"targetPartitionSize": 5000000
|
||||||
|
},
|
||||||
|
"jobProperties" : {
|
||||||
|
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
|
||||||
|
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
|
||||||
|
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
|
||||||
|
},
|
||||||
|
"leaveIntermediate": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,58 @@
|
||||||
|
{
|
||||||
|
"type" : "index_hadoop",
|
||||||
|
"spec" : {
|
||||||
|
"ioConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"inputSpec": {
|
||||||
|
"type": "static",
|
||||||
|
"inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
|
||||||
|
"paths": "wikipedia_list.parquet"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"dataSchema": {
|
||||||
|
"dataSource": "wikipedia",
|
||||||
|
"parser": {
|
||||||
|
"type": "parquet",
|
||||||
|
"parseSpec": {
|
||||||
|
"format": "timeAndDims",
|
||||||
|
"timestampSpec": {
|
||||||
|
"column": "timestamp",
|
||||||
|
"format": "auto"
|
||||||
|
},
|
||||||
|
"dimensionsSpec": {
|
||||||
|
"dimensions": [
|
||||||
|
"page",
|
||||||
|
"language",
|
||||||
|
"user"
|
||||||
|
],
|
||||||
|
"dimensionExclusions": [],
|
||||||
|
"spatialDimensions": []
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"metricsSpec": [
|
||||||
|
{
|
||||||
|
"type": "count",
|
||||||
|
"name": "count"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "doubleSum",
|
||||||
|
"name": "added",
|
||||||
|
"fieldName": "added"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"granularitySpec": {
|
||||||
|
"type": "uniform",
|
||||||
|
"segmentGranularity": "DAY",
|
||||||
|
"queryGranularity": "NONE",
|
||||||
|
"intervals": ["2013-08-31/2013-09-01"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tuningConfig": {
|
||||||
|
"type": "hadoop",
|
||||||
|
"partitionsSpec": {
|
||||||
|
"targetPartitionSize": 5000000
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Binary file not shown.
|
@ -0,0 +1,45 @@
|
||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<parent>
|
||||||
|
<artifactId>druid</artifactId>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<version>0.9.1-SNAPSHOT</version>
|
||||||
|
<relativePath>../../pom.xml</relativePath>
|
||||||
|
</parent>
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<artifactId>druid-parquet-extensions</artifactId>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid.extensions</groupId>
|
||||||
|
<artifactId>druid-avro-extensions</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.parquet</groupId>
|
||||||
|
<artifactId>parquet-avro</artifactId>
|
||||||
|
<version>1.8.0</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.druid</groupId>
|
||||||
|
<artifactId>druid-indexing-hadoop</artifactId>
|
||||||
|
<version>${project.parent.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.hadoop</groupId>
|
||||||
|
<artifactId>hadoop-client</artifactId>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
|
@ -0,0 +1,31 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.data.input.parquet;
|
||||||
|
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.parquet.avro.DruidParquetReadSupport;
|
||||||
|
import org.apache.parquet.hadoop.ParquetInputFormat;
|
||||||
|
|
||||||
|
public class DruidParquetInputFormat extends ParquetInputFormat<GenericRecord>
|
||||||
|
{
|
||||||
|
public DruidParquetInputFormat()
|
||||||
|
{
|
||||||
|
super(DruidParquetReadSupport.class);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.data.input.parquet;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.Module;
|
||||||
|
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||||
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import io.druid.initialization.DruidModule;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ParquetExtensionsModule implements DruidModule
|
||||||
|
{
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<? extends Module> getJacksonModules()
|
||||||
|
{
|
||||||
|
return Arrays.asList(
|
||||||
|
new SimpleModule("ParuqetInputRowParserModule")
|
||||||
|
.registerSubtypes(
|
||||||
|
new NamedType(ParquetHadoopInputRowParser.class, "parquet")
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{ }
|
||||||
|
}
|
|
@ -0,0 +1,80 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.data.input.parquet;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import io.druid.data.input.AvroStreamInputRowParser;
|
||||||
|
import io.druid.data.input.InputRow;
|
||||||
|
import io.druid.data.input.MapBasedInputRow;
|
||||||
|
import io.druid.data.input.avro.GenericRecordAsMap;
|
||||||
|
import io.druid.data.input.impl.DimensionSchema;
|
||||||
|
import io.druid.data.input.impl.InputRowParser;
|
||||||
|
import io.druid.data.input.impl.ParseSpec;
|
||||||
|
import io.druid.data.input.impl.TimestampSpec;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class ParquetHadoopInputRowParser implements InputRowParser<GenericRecord>
|
||||||
|
{
|
||||||
|
private final ParseSpec parseSpec;
|
||||||
|
private final List<String> dimensions;
|
||||||
|
|
||||||
|
@JsonCreator
|
||||||
|
public ParquetHadoopInputRowParser(
|
||||||
|
@JsonProperty("parseSpec") ParseSpec parseSpec
|
||||||
|
)
|
||||||
|
{
|
||||||
|
this.parseSpec = parseSpec;
|
||||||
|
|
||||||
|
List<DimensionSchema> dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
|
||||||
|
this.dimensions = Lists.newArrayList();
|
||||||
|
for (DimensionSchema dim : dimensionSchema) {
|
||||||
|
this.dimensions.add(dim.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean)}
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public InputRow parse(GenericRecord record)
|
||||||
|
{
|
||||||
|
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false);
|
||||||
|
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
|
||||||
|
DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
|
||||||
|
return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
@JsonProperty
|
||||||
|
@Override
|
||||||
|
public ParseSpec getParseSpec()
|
||||||
|
{
|
||||||
|
return parseSpec;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InputRowParser withParseSpec(ParseSpec parseSpec)
|
||||||
|
{
|
||||||
|
return new ParquetHadoopInputRowParser(parseSpec);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.parquet.avro;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
|
import io.druid.data.input.impl.DimensionSchema;
|
||||||
|
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||||
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||||
|
import org.apache.avro.Schema;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.parquet.hadoop.api.InitContext;
|
||||||
|
import org.apache.parquet.io.api.RecordMaterializer;
|
||||||
|
import org.apache.parquet.schema.MessageType;
|
||||||
|
import org.apache.parquet.schema.Type;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
public class DruidParquetReadSupport extends AvroReadSupport<GenericRecord>
|
||||||
|
{
|
||||||
|
private MessageType getPartialReadSchema(InitContext context)
|
||||||
|
{
|
||||||
|
MessageType fullSchema = context.getFileSchema();
|
||||||
|
|
||||||
|
String name = fullSchema.getName();
|
||||||
|
|
||||||
|
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
|
||||||
|
String tsField = config.getParser().getParseSpec().getTimestampSpec().getTimestampColumn();
|
||||||
|
|
||||||
|
List<DimensionSchema> dimensionSchema = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
|
||||||
|
Set<String> dimensions = Sets.newHashSet();
|
||||||
|
for (DimensionSchema dim : dimensionSchema) {
|
||||||
|
dimensions.add(dim.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<String> metricsFields = Sets.newHashSet();
|
||||||
|
for (AggregatorFactory agg : config.getSchema().getDataSchema().getAggregators()) {
|
||||||
|
metricsFields.addAll(agg.requiredFields());
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Type> partialFields = Lists.newArrayList();
|
||||||
|
|
||||||
|
for (Type type : fullSchema.getFields()) {
|
||||||
|
if (tsField.equals(type.getName())
|
||||||
|
|| metricsFields.contains(type.getName())
|
||||||
|
|| dimensions.size() > 0 && dimensions.contains(type.getName())
|
||||||
|
|| dimensions.size() == 0) {
|
||||||
|
partialFields.add(type);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new MessageType(name, partialFields);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ReadContext init(InitContext context)
|
||||||
|
{
|
||||||
|
MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), getPartialReadSchema(context));
|
||||||
|
return new ReadContext(requestedProjection);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordMaterializer<GenericRecord> prepareForRead(
|
||||||
|
Configuration configuration, Map<String, String> keyValueMetaData,
|
||||||
|
MessageType fileSchema, ReadContext readContext
|
||||||
|
)
|
||||||
|
{
|
||||||
|
|
||||||
|
MessageType parquetSchema = readContext.getRequestedSchema();
|
||||||
|
Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
|
||||||
|
|
||||||
|
Class<? extends AvroDataSupplier> suppClass = configuration.getClass(
|
||||||
|
AVRO_DATA_SUPPLIER,
|
||||||
|
SpecificDataSupplier.class,
|
||||||
|
AvroDataSupplier.class
|
||||||
|
);
|
||||||
|
AvroDataSupplier supplier = ReflectionUtils.newInstance(suppClass, configuration);
|
||||||
|
return new AvroRecordMaterializer<GenericRecord>(parquetSchema, avroSchema, supplier.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1 @@
|
||||||
|
io.druid.data.input.parquet.ParquetExtensionsModule
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package io.druid.data.input.parquet;
|
||||||
|
|
||||||
|
import io.druid.indexer.HadoopDruidIndexerConfig;
|
||||||
|
import org.apache.avro.generic.GenericRecord;
|
||||||
|
import org.apache.avro.util.Utf8;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.mapreduce.InputFormat;
|
||||||
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.mapreduce.RecordReader;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptContext;
|
||||||
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
||||||
|
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
|
||||||
|
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
public class DruidParquetInputFormatTest
|
||||||
|
{
|
||||||
|
@Test
|
||||||
|
public void test() throws IOException, InterruptedException
|
||||||
|
{
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
Job job = Job.getInstance(conf);
|
||||||
|
|
||||||
|
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
|
||||||
|
"example/wikipedia_hadoop_parquet_job.json"));
|
||||||
|
|
||||||
|
config.intoConfiguration(job);
|
||||||
|
|
||||||
|
File testFile = new File("example/wikipedia_list.parquet");
|
||||||
|
Path path = new Path(testFile.getAbsoluteFile().toURI());
|
||||||
|
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
|
||||||
|
|
||||||
|
InputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration());
|
||||||
|
|
||||||
|
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
|
||||||
|
RecordReader reader = inputFormat.createRecordReader(split, context);
|
||||||
|
|
||||||
|
reader.initialize(split, context);
|
||||||
|
|
||||||
|
reader.nextKeyValue();
|
||||||
|
|
||||||
|
GenericRecord data = (GenericRecord) reader.getCurrentValue();
|
||||||
|
|
||||||
|
// field not read, should return null
|
||||||
|
assertEquals(data.get("added"), null);
|
||||||
|
|
||||||
|
assertEquals(data.get("page"), new Utf8("Gypsy Danger"));
|
||||||
|
|
||||||
|
reader.close();
|
||||||
|
}
|
||||||
|
}
|
1
pom.xml
1
pom.xml
|
@ -102,6 +102,7 @@
|
||||||
<module>extensions-contrib/kafka-eight-simpleConsumer</module>
|
<module>extensions-contrib/kafka-eight-simpleConsumer</module>
|
||||||
<module>extensions-contrib/rabbitmq</module>
|
<module>extensions-contrib/rabbitmq</module>
|
||||||
<module>extensions-contrib/distinctcount</module>
|
<module>extensions-contrib/distinctcount</module>
|
||||||
|
<module>extensions-contrib/parquet-extensions</module>
|
||||||
|
|
||||||
<!-- distribution packaging -->
|
<!-- distribution packaging -->
|
||||||
<module>distribution</module>
|
<module>distribution</module>
|
||||||
|
|
Loading…
Reference in New Issue