diff --git a/docs/content/development/extensions-contrib/parquet.md b/docs/content/development/extensions-contrib/parquet.md
new file mode 100644
index 00000000000..a3c3a7060dc
--- /dev/null
+++ b/docs/content/development/extensions-contrib/parquet.md
@@ -0,0 +1,90 @@
+# Parquet
+
+This extension enables Druid to ingest and understand the Apache Parquet data format offline.
+
+## Parquet Hadoop Parser
+
+This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"io.druid.data.input.parquet.DruidParquetInputFormat"`. Make sure also to include "io.druid.extensions:druid-avro-extensions" as an extension.
+
+Field | Type | Description | Required
+----------|-------------|----------------------------------------------------------------------------------------|---------
+type | String | This should say `parquet` | yes
+parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes
+
+For example:
+```json
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
+ "paths": "no_metrics"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "no_metrics",
+ "parser": {
+ "type": "parquet",
+ "parseSpec": {
+ "format": "timeAndDims",
+ "timestampSpec": {
+ "column": "time",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "name"
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [{
+ "type": "count",
+ "name": "count"
+ }],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "ALL",
+ "intervals": ["2015-12-31/2016-01-02"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {},
+ "leaveIntermediate": true
+ }
+ }
+}
+
+```
+
+Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.
+
+It is no need to make your cluster to update to SNAPSHOT, you can just fire a hadoop job with your local compiled jars like:
+
+```bash
+HADOOP_CLASS_PATH=`hadoop classpath | sed s/*.jar/*/g`
+
+java -Xmx32m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \
+ -classpath config/overlord:config/_common:lib/*:$HADOOP_CLASS_PATH:extensions/druid-avro-extensions/* \
+ io.druid.cli.Main index hadoop \
+ wikipedia_hadoop_parquet_job.json
+```
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 69897e0cca5..c53c4d1285a 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -45,6 +45,7 @@ If you'd like to take on maintenance for a community extension, please post on [
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
+|druid-parquet-extensions|Support for data in Apache Parquet data format.|[link](../development/extensions-contrib/parquet.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
diff --git a/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json b/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json
new file mode 100755
index 00000000000..ead44698879
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json
@@ -0,0 +1,64 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
+ "paths": "no_metrics"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "no_metrics",
+ "parser": {
+ "type": "parquet",
+ "parseSpec": {
+ "format": "timeAndDims",
+ "timestampSpec": {
+ "column": "time",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "name"
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [{
+ "type": "count",
+ "name": "count"
+ }],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "ALL",
+ "intervals": ["2015-12-31/2016-01-02"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json
new file mode 100755
index 00000000000..2d5947899b3
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json
@@ -0,0 +1,75 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
+ "paths": "wikipedia.gz.parquet"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "/tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "wikipedia",
+ "parser": {
+ "type": "parquet",
+ "parseSpec": {
+ "format": "timeAndDims",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "page",
+ "language",
+ "user",
+ "unpatrolled"
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [{
+ "type": "count",
+ "name": "count"
+ }, {
+ "type": "doubleSum",
+ "name": "deleted",
+ "fieldName": "deleted"
+ }, {
+ "type": "doubleSum",
+ "name": "delta",
+ "fieldName": "delta"
+ }],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2013-08-30/2013-09-02"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json
new file mode 100755
index 00000000000..858636b7ff3
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json
@@ -0,0 +1,58 @@
+{
+ "type" : "index_hadoop",
+ "spec" : {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat",
+ "paths": "wikipedia_list.parquet"
+ }
+ },
+ "dataSchema": {
+ "dataSource": "wikipedia",
+ "parser": {
+ "type": "parquet",
+ "parseSpec": {
+ "format": "timeAndDims",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "page",
+ "language",
+ "user"
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [
+ {
+ "type": "count",
+ "name": "count"
+ },
+ {
+ "type": "doubleSum",
+ "name": "added",
+ "fieldName": "added"
+ }
+ ],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2013-08-31/2013-09-01"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ }
+ }
+ }
+}
diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_list.parquet b/extensions-contrib/parquet-extensions/example/wikipedia_list.parquet
new file mode 100755
index 00000000000..5d0624d0d68
Binary files /dev/null and b/extensions-contrib/parquet-extensions/example/wikipedia_list.parquet differ
diff --git a/extensions-contrib/parquet-extensions/pom.xml b/extensions-contrib/parquet-extensions/pom.xml
new file mode 100644
index 00000000000..bed26856b2b
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+ druid
+ io.druid
+ 0.9.1-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ druid-parquet-extensions
+
+
+
+ io.druid.extensions
+ druid-avro-extensions
+ ${project.parent.version}
+ provided
+
+
+ org.apache.parquet
+ parquet-avro
+ 1.8.0
+
+
+ io.druid
+ druid-indexing-hadoop
+ ${project.parent.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client
+ provided
+
+
+ junit
+ junit
+ test
+
+
+
+
diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java
new file mode 100755
index 00000000000..844280bd6c4
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.parquet;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.parquet.avro.DruidParquetReadSupport;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+
+public class DruidParquetInputFormat extends ParquetInputFormat
+{
+ public DruidParquetInputFormat()
+ {
+ super(DruidParquetReadSupport.class);
+ }
+}
diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java
new file mode 100644
index 00000000000..657a94ae196
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.data.input.parquet;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class ParquetExtensionsModule implements DruidModule
+{
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Arrays.asList(
+ new SimpleModule("ParuqetInputRowParserModule")
+ .registerSubtypes(
+ new NamedType(ParquetHadoopInputRowParser.class, "parquet")
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ { }
+}
diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java
new file mode 100755
index 00000000000..fef4022297d
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
+import io.druid.data.input.AvroStreamInputRowParser;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.data.input.avro.GenericRecordAsMap;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.data.input.impl.ParseSpec;
+import io.druid.data.input.impl.TimestampSpec;
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.DateTime;
+
+import java.util.List;
+
+public class ParquetHadoopInputRowParser implements InputRowParser
+{
+ private final ParseSpec parseSpec;
+ private final List dimensions;
+
+ @JsonCreator
+ public ParquetHadoopInputRowParser(
+ @JsonProperty("parseSpec") ParseSpec parseSpec
+ )
+ {
+ this.parseSpec = parseSpec;
+
+ List dimensionSchema = parseSpec.getDimensionsSpec().getDimensions();
+ this.dimensions = Lists.newArrayList();
+ for (DimensionSchema dim : dimensionSchema) {
+ this.dimensions.add(dim.getName());
+ }
+ }
+
+ /**
+ * imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean)}
+ */
+ @Override
+ public InputRow parse(GenericRecord record)
+ {
+ GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false);
+ TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
+ DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap);
+ return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap);
+ }
+
+ @JsonProperty
+ @Override
+ public ParseSpec getParseSpec()
+ {
+ return parseSpec;
+ }
+
+ @Override
+ public InputRowParser withParseSpec(ParseSpec parseSpec)
+ {
+ return new ParquetHadoopInputRowParser(parseSpec);
+ }
+}
diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java b/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java
new file mode 100755
index 00000000000..36cf7a18518
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.druid.data.input.impl.DimensionSchema;
+import io.druid.indexer.HadoopDruidIndexerConfig;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class DruidParquetReadSupport extends AvroReadSupport
+{
+ private MessageType getPartialReadSchema(InitContext context)
+ {
+ MessageType fullSchema = context.getFileSchema();
+
+ String name = fullSchema.getName();
+
+ HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
+ String tsField = config.getParser().getParseSpec().getTimestampSpec().getTimestampColumn();
+
+ List dimensionSchema = config.getParser().getParseSpec().getDimensionsSpec().getDimensions();
+ Set dimensions = Sets.newHashSet();
+ for (DimensionSchema dim : dimensionSchema) {
+ dimensions.add(dim.getName());
+ }
+
+ Set metricsFields = Sets.newHashSet();
+ for (AggregatorFactory agg : config.getSchema().getDataSchema().getAggregators()) {
+ metricsFields.addAll(agg.requiredFields());
+ }
+
+ List partialFields = Lists.newArrayList();
+
+ for (Type type : fullSchema.getFields()) {
+ if (tsField.equals(type.getName())
+ || metricsFields.contains(type.getName())
+ || dimensions.size() > 0 && dimensions.contains(type.getName())
+ || dimensions.size() == 0) {
+ partialFields.add(type);
+ }
+ }
+
+ return new MessageType(name, partialFields);
+ }
+
+ public ReadContext init(InitContext context)
+ {
+ MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), getPartialReadSchema(context));
+ return new ReadContext(requestedProjection);
+ }
+
+ @Override
+ public RecordMaterializer prepareForRead(
+ Configuration configuration, Map keyValueMetaData,
+ MessageType fileSchema, ReadContext readContext
+ )
+ {
+
+ MessageType parquetSchema = readContext.getRequestedSchema();
+ Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
+
+ Class extends AvroDataSupplier> suppClass = configuration.getClass(
+ AVRO_DATA_SUPPLIER,
+ SpecificDataSupplier.class,
+ AvroDataSupplier.class
+ );
+ AvroDataSupplier supplier = ReflectionUtils.newInstance(suppClass, configuration);
+ return new AvroRecordMaterializer(parquetSchema, avroSchema, supplier.get());
+ }
+
+}
diff --git a/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100755
index 00000000000..26725119c5d
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.data.input.parquet.ParquetExtensionsModule
diff --git a/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java b/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java
new file mode 100644
index 00000000000..b503a57255d
--- /dev/null
+++ b/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package io.druid.data.input.parquet;
+
+import io.druid.indexer.HadoopDruidIndexerConfig;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class DruidParquetInputFormatTest
+{
+ @Test
+ public void test() throws IOException, InterruptedException
+ {
+ Configuration conf = new Configuration();
+ Job job = Job.getInstance(conf);
+
+ HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File(
+ "example/wikipedia_hadoop_parquet_job.json"));
+
+ config.intoConfiguration(job);
+
+ File testFile = new File("example/wikipedia_list.parquet");
+ Path path = new Path(testFile.getAbsoluteFile().toURI());
+ FileSplit split = new FileSplit(path, 0, testFile.length(), null);
+
+ InputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration());
+
+ TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
+ RecordReader reader = inputFormat.createRecordReader(split, context);
+
+ reader.initialize(split, context);
+
+ reader.nextKeyValue();
+
+ GenericRecord data = (GenericRecord) reader.getCurrentValue();
+
+ // field not read, should return null
+ assertEquals(data.get("added"), null);
+
+ assertEquals(data.get("page"), new Utf8("Gypsy Danger"));
+
+ reader.close();
+ }
+}
diff --git a/pom.xml b/pom.xml
index dccccbd52a8..b5a06fe3d95 100644
--- a/pom.xml
+++ b/pom.xml
@@ -102,6 +102,7 @@
extensions-contrib/kafka-eight-simpleConsumer
extensions-contrib/rabbitmq
extensions-contrib/distinctcount
+ extensions-contrib/parquet-extensions
distribution