From 0f67ff7dfb6ddec5e9f9552f74d3a4a31b0a6829 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?DuNinglin=20=5B=E6=9D=9C=E5=AE=81=E6=9E=97=5D?= Date: Mon, 28 Mar 2016 15:51:36 +0800 Subject: [PATCH] reoganize code folder according to recent upstream folder changes, seperate it from avro code and take it into extensions-conrib. docs rewite too --- .../development/extensions-contrib/parquet.md | 90 ++++++++++++++++ docs/content/development/extensions.md | 1 + .../example/parquet_no_metric_job.json | 64 +++++++++++ .../example/wikipedia_hadoop_parquet_job.json | 75 +++++++++++++ .../wikipedia_hadoop_parquet_task.json | 58 ++++++++++ .../example/wikipedia_list.parquet | Bin 0 -> 4312 bytes extensions-contrib/parquet-extensions/pom.xml | 45 ++++++++ .../parquet/DruidParquetInputFormat.java | 31 ++++++ .../parquet/ParquetExtensionsModule.java | 48 +++++++++ .../parquet/ParquetHadoopInputRowParser.java | 80 ++++++++++++++ .../parquet/avro/DruidParquetReadSupport.java | 102 ++++++++++++++++++ .../io.druid.initialization.DruidModule | 1 + .../parquet/DruidParquetInputFormatTest.java | 76 +++++++++++++ pom.xml | 1 + 14 files changed, 672 insertions(+) create mode 100644 docs/content/development/extensions-contrib/parquet.md create mode 100755 extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json create mode 100755 extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json create mode 100755 extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json create mode 100755 extensions-contrib/parquet-extensions/example/wikipedia_list.parquet create mode 100644 extensions-contrib/parquet-extensions/pom.xml create mode 100755 extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java create mode 100644 extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java create mode 100755 extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java create mode 100755 extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java create mode 100755 extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java diff --git a/docs/content/development/extensions-contrib/parquet.md b/docs/content/development/extensions-contrib/parquet.md new file mode 100644 index 00000000000..a3c3a7060dc --- /dev/null +++ b/docs/content/development/extensions-contrib/parquet.md @@ -0,0 +1,90 @@ +# Parquet + +This extension enables Druid to ingest and understand the Apache Parquet data format offline. + +## Parquet Hadoop Parser + +This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"io.druid.data.input.parquet.DruidParquetInputFormat"`. Make sure also to include "io.druid.extensions:druid-avro-extensions" as an extension. + +Field | Type | Description | Required +----------|-------------|----------------------------------------------------------------------------------------|--------- +type | String | This should say `parquet` | yes +parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Should be a timeAndDims parseSpec. | yes + +For example: +```json +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "no_metrics" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "tmp/segments" + }, + "dataSchema": { + "dataSource": "no_metrics", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "name" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + "metricsSpec": [{ + "type": "count", + "name": "count" + }], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "ALL", + "intervals": ["2015-12-31/2016-01-02"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : {}, + "leaveIntermediate": true + } + } +} + +``` + +Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated. + +It is no need to make your cluster to update to SNAPSHOT, you can just fire a hadoop job with your local compiled jars like: + +```bash +HADOOP_CLASS_PATH=`hadoop classpath | sed s/*.jar/*/g` + +java -Xmx32m -Duser.timezone=UTC -Dfile.encoding=UTF-8 \ + -classpath config/overlord:config/_common:lib/*:$HADOOP_CLASS_PATH:extensions/druid-avro-extensions/* \ + io.druid.cli.Main index hadoop \ + wikipedia_hadoop_parquet_job.json +``` diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index 69897e0cca5..c53c4d1285a 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -45,6 +45,7 @@ If you'd like to take on maintenance for a community extension, please post on [ |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)| |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)| |druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)| +|druid-parquet-extensions|Support for data in Apache Parquet data format.|[link](../development/extensions-contrib/parquet.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| diff --git a/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json b/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json new file mode 100755 index 00000000000..ead44698879 --- /dev/null +++ b/extensions-contrib/parquet-extensions/example/parquet_no_metric_job.json @@ -0,0 +1,64 @@ +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "no_metrics" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "tmp/segments" + }, + "dataSchema": { + "dataSource": "no_metrics", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "name" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + "metricsSpec": [{ + "type": "count", + "name": "count" + }], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "ALL", + "intervals": ["2015-12-31/2016-01-02"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : { + "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" + }, + "leaveIntermediate": true + } + } +} diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json new file mode 100755 index 00000000000..2d5947899b3 --- /dev/null +++ b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_job.json @@ -0,0 +1,75 @@ +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "wikipedia.gz.parquet" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "/tmp/segments" + }, + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + "language", + "user", + "unpatrolled" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + "metricsSpec": [{ + "type": "count", + "name": "count" + }, { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "intervals": ["2013-08-30/2013-09-02"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : { + "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" + }, + "leaveIntermediate": true + } + } +} diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json new file mode 100755 index 00000000000..858636b7ff3 --- /dev/null +++ b/extensions-contrib/parquet-extensions/example/wikipedia_hadoop_parquet_task.json @@ -0,0 +1,58 @@ +{ + "type" : "index_hadoop", + "spec" : { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "io.druid.data.input.parquet.DruidParquetInputFormat", + "paths": "wikipedia_list.parquet" + } + }, + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "parquet", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + "language", + "user" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + } + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "intervals": ["2013-08-31/2013-09-01"] + } + }, + "tuningConfig": { + "type": "hadoop", + "partitionsSpec": { + "targetPartitionSize": 5000000 + } + } + } +} diff --git a/extensions-contrib/parquet-extensions/example/wikipedia_list.parquet b/extensions-contrib/parquet-extensions/example/wikipedia_list.parquet new file mode 100755 index 0000000000000000000000000000000000000000..5d0624d0d687aaac62a1defca540416b5233eaee GIT binary patch literal 4312 zcmb_geQXow8Na)n9mh$E3%To=M#5P2m9WIM9r8hDDRGxXKuwg8kWpaZvwe0hx!gJD zbAv~NvS`ifj8(w}LpG{F1XfXEa z$3-}-g!!L)zkbi>?|q(EZg%alV-R)_Yb(X{V4%fx#e^=F1Ia%^2P*>zm%f{Rs7p?L zUXRC<_=Uf}{}0Cv*EZh#(m(k7W=abU{ovxL6`a9PY&}-v#t;l$0i!*h9j`5vG-;rP z#U59lbP~c0Shb`aRg`N26dR@q<&#xy(4&P4Kc4=jWDI~G!-ip%@tQWiE6Nkv-F(=` z_huFFg(f~7i|Vr@Xo1+ z(60t^j@Bw(xiQua>|Ej1=fJB6w_e#g-yB-lU3SymbPv^6C-BonmSsOV)n7d`5`!)k zmwiSm_7-7ImAi#aydZX%T-~(5_$UH(g&8WV(LwK_k5TC2g`RbKMNU7ACDe`=?&TPf z_M2KoN~FU?o+}a-1&Uoo9EidB=!j*qL_ckbSte)B*1^BMy%Zn&&Dn+eO4G{|AAVF_ zegc~l7Uvdn%CE1_U;hti=x*m0!^doc{WssdOwbKy%qvjvy?z;Rc zW8G2+jlFs2PcmC10`5}_{C=8%JzpumtTB1z(K`6h@dD5Ay7R>TGn@a}JK=<4711k6 z3feHY8`G^!mc-m0zw;sSy++g2L{WXffiwV(wh9Yt)L1cU^oy~$o(4Rn{Ur$do%l8Py%vj^&$|;X$OUc&V zwB|Mw+deZ1{dF7h!t;u+E48Ac5P9;o^Cuwa-E-qNlGpYsjLoU}_gt$eFkQ4@YKRDQ z7ey3&tAiFe-sECK9oZ@U1swnTDb36&y)_p7%Vfiwb;m+GTLN!%o*3w2S~}k_?|3WR z*x7RE#(=NM)o8AL`|!z;17}BnYS{Do@QKmU-1hk4k;C!#4ahdjCTQb}wl5DTejUNy zQQM_$?Vz|L4553Kike&ky^1YRyeW1{|4zeX>}=iDu=mi;&OJz_hizgwqy2t9xPqdupS^Q>V{6*?Mr>>AqD3fm9hC2J0Q4m7Tr`F1kwk6>(E z?Ou_?W-l+ZRhO09+w6{-ZT1>R)qX;~YO~ka95v1vuWQ9h|+U#jni{Zq^RA zYa82VXIt(s&9=`rwv*$;(sKLYV0+`>{iQlrYyLNUuNCy1+g$gr^m6WUo?{EJzYfO_bw!gF2i@`L0_0`fUcabkIFeljE^?#Kn0!`F%!7u7KG4Hen0Jp zumUj7qd1<6cJq<21=Ij(1@$pvG&RZd@tnwTG$*E3a(#S+69xH%BrMlY1KTBWvS=_9 zDX0P=bS%XMBUF$EZBz?YphS+B3Sx+&LqiDyl~KY`-E=ra`RKF~j_z#+*wh>isU}*1 z4l+EK+S3TUmrt9QCo({mNOgx&`}q+rM2P~=va~<7hDO4)04BKJSZsV`Aa%tUYOK&f zT4ORe7>euh4`6&8CUwqLpA_T=a4}W@fG7ZFA}NChVEi@=4mgvK0ri`J?9Wkfl0rG& zGGq?MKZU^_fIR*TAU{P_y3%Qpfr~JnK)?Yfv1&smA80cwQ#q8uOECU#7;FRZ6MHau zUlpZP)@2DGPGd0G27y|omy$zzRfJ`Zq>%5!U=tu$X`1AR`I6lh5aH@Hr6ejT6arEV z_yrih2ZMvo6)lfEQ>Y49nqn!v|AD~)C$U<)BID~a=-eb7Wo#VA@4{f)Nj%nc$Onc@ zkY&!wh;s-&g@8RL@xQwaIkq7qBFS`F2p|VgJGQ||WN!&%KbA3Axf3dL`>|~1-o-Pz zG8adN4dmdL5b`eWTv3d_PKhRmj2T1JmrrHRkTIJ%_cvzDk#A_q@y`IeHUf@hEtL2T z0@p*vVxoi>f>sKYkq+7ls)^8L4fn8CFzAWUqE+B~t=+Up`6-biKG7XDi_s8mt}%mW zX2d6&OU(g>X8mDv&4Ht4i7a4CE|Xvh>!~LGoVwI`&mTDrGD)KlD#1#;OSy07mBrlQ~f86Pp4dNK42L%u5QrT z+R{W&vLn@2is8adAkj*a`@VM8(`6>DRU+6^Wj$GI)-FKmm1{>U&Gi}ZQa4^ZJl($6 zE<)AspFs$CAUrMlJADxG&1AhP>lRE~yb*@=n<^>FVGmSSRaI6xD!df|s?uw(wpDER c1!yl_Rk3Y*z(Lz=K5q$h2|N&+!G9Y61vX-S0ssI2 literal 0 HcmV?d00001 diff --git a/extensions-contrib/parquet-extensions/pom.xml b/extensions-contrib/parquet-extensions/pom.xml new file mode 100644 index 00000000000..bed26856b2b --- /dev/null +++ b/extensions-contrib/parquet-extensions/pom.xml @@ -0,0 +1,45 @@ + + + + druid + io.druid + 0.9.1-SNAPSHOT + ../../pom.xml + + 4.0.0 + + druid-parquet-extensions + + + + io.druid.extensions + druid-avro-extensions + ${project.parent.version} + provided + + + org.apache.parquet + parquet-avro + 1.8.0 + + + io.druid + druid-indexing-hadoop + ${project.parent.version} + provided + + + org.apache.hadoop + hadoop-client + provided + + + junit + junit + test + + + + diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java new file mode 100755 index 00000000000..844280bd6c4 --- /dev/null +++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/DruidParquetInputFormat.java @@ -0,0 +1,31 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.parquet; + +import org.apache.avro.generic.GenericRecord; +import org.apache.parquet.avro.DruidParquetReadSupport; +import org.apache.parquet.hadoop.ParquetInputFormat; + +public class DruidParquetInputFormat extends ParquetInputFormat +{ + public DruidParquetInputFormat() + { + super(DruidParquetReadSupport.class); + } +} diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java new file mode 100644 index 00000000000..657a94ae196 --- /dev/null +++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetExtensionsModule.java @@ -0,0 +1,48 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.parquet; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +public class ParquetExtensionsModule implements DruidModule +{ + + @Override + public List getJacksonModules() + { + return Arrays.asList( + new SimpleModule("ParuqetInputRowParserModule") + .registerSubtypes( + new NamedType(ParquetHadoopInputRowParser.class, "parquet") + ) + ); + } + + @Override + public void configure(Binder binder) + { } +} diff --git a/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java new file mode 100755 index 00000000000..fef4022297d --- /dev/null +++ b/extensions-contrib/parquet-extensions/src/main/java/io/druid/data/input/parquet/ParquetHadoopInputRowParser.java @@ -0,0 +1,80 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.parquet; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Lists; +import io.druid.data.input.AvroStreamInputRowParser; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.avro.GenericRecordAsMap; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import org.apache.avro.generic.GenericRecord; +import org.joda.time.DateTime; + +import java.util.List; + +public class ParquetHadoopInputRowParser implements InputRowParser +{ + private final ParseSpec parseSpec; + private final List dimensions; + + @JsonCreator + public ParquetHadoopInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec + ) + { + this.parseSpec = parseSpec; + + List dimensionSchema = parseSpec.getDimensionsSpec().getDimensions(); + this.dimensions = Lists.newArrayList(); + for (DimensionSchema dim : dimensionSchema) { + this.dimensions.add(dim.getName()); + } + } + + /** + * imitate avro extension {@link AvroStreamInputRowParser#parseGenericRecord(GenericRecord, ParseSpec, List, boolean)} + */ + @Override + public InputRow parse(GenericRecord record) + { + GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, false); + TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); + DateTime dateTime = timestampSpec.extractTimestamp(genericRecordAsMap); + return new MapBasedInputRow(dateTime, dimensions, genericRecordAsMap); + } + + @JsonProperty + @Override + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new ParquetHadoopInputRowParser(parseSpec); + } +} diff --git a/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java b/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java new file mode 100755 index 00000000000..36cf7a18518 --- /dev/null +++ b/extensions-contrib/parquet-extensions/src/main/java/org/apache/parquet/avro/DruidParquetReadSupport.java @@ -0,0 +1,102 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.avro; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class DruidParquetReadSupport extends AvroReadSupport +{ + private MessageType getPartialReadSchema(InitContext context) + { + MessageType fullSchema = context.getFileSchema(); + + String name = fullSchema.getName(); + + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); + String tsField = config.getParser().getParseSpec().getTimestampSpec().getTimestampColumn(); + + List dimensionSchema = config.getParser().getParseSpec().getDimensionsSpec().getDimensions(); + Set dimensions = Sets.newHashSet(); + for (DimensionSchema dim : dimensionSchema) { + dimensions.add(dim.getName()); + } + + Set metricsFields = Sets.newHashSet(); + for (AggregatorFactory agg : config.getSchema().getDataSchema().getAggregators()) { + metricsFields.addAll(agg.requiredFields()); + } + + List partialFields = Lists.newArrayList(); + + for (Type type : fullSchema.getFields()) { + if (tsField.equals(type.getName()) + || metricsFields.contains(type.getName()) + || dimensions.size() > 0 && dimensions.contains(type.getName()) + || dimensions.size() == 0) { + partialFields.add(type); + } + } + + return new MessageType(name, partialFields); + } + + public ReadContext init(InitContext context) + { + MessageType requestedProjection = getSchemaForRead(context.getFileSchema(), getPartialReadSchema(context)); + return new ReadContext(requestedProjection); + } + + @Override + public RecordMaterializer prepareForRead( + Configuration configuration, Map keyValueMetaData, + MessageType fileSchema, ReadContext readContext + ) + { + + MessageType parquetSchema = readContext.getRequestedSchema(); + Schema avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema); + + Class suppClass = configuration.getClass( + AVRO_DATA_SUPPLIER, + SpecificDataSupplier.class, + AvroDataSupplier.class + ); + AvroDataSupplier supplier = ReflectionUtils.newInstance(suppClass, configuration); + return new AvroRecordMaterializer(parquetSchema, avroSchema, supplier.get()); + } + +} diff --git a/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100755 index 00000000000..26725119c5d --- /dev/null +++ b/extensions-contrib/parquet-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.data.input.parquet.ParquetExtensionsModule diff --git a/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java b/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java new file mode 100644 index 00000000000..b503a57255d --- /dev/null +++ b/extensions-contrib/parquet-extensions/src/test/java/io/druid/data/input/parquet/DruidParquetInputFormatTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.parquet; + +import io.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.util.ReflectionUtils; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class DruidParquetInputFormatTest +{ + @Test + public void test() throws IOException, InterruptedException + { + Configuration conf = new Configuration(); + Job job = Job.getInstance(conf); + + HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromFile(new File( + "example/wikipedia_hadoop_parquet_job.json")); + + config.intoConfiguration(job); + + File testFile = new File("example/wikipedia_list.parquet"); + Path path = new Path(testFile.getAbsoluteFile().toURI()); + FileSplit split = new FileSplit(path, 0, testFile.length(), null); + + InputFormat inputFormat = ReflectionUtils.newInstance(DruidParquetInputFormat.class, job.getConfiguration()); + + TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = inputFormat.createRecordReader(split, context); + + reader.initialize(split, context); + + reader.nextKeyValue(); + + GenericRecord data = (GenericRecord) reader.getCurrentValue(); + + // field not read, should return null + assertEquals(data.get("added"), null); + + assertEquals(data.get("page"), new Utf8("Gypsy Danger")); + + reader.close(); + } +} diff --git a/pom.xml b/pom.xml index dccccbd52a8..b5a06fe3d95 100644 --- a/pom.xml +++ b/pom.xml @@ -102,6 +102,7 @@ extensions-contrib/kafka-eight-simpleConsumer extensions-contrib/rabbitmq extensions-contrib/distinctcount + extensions-contrib/parquet-extensions distribution