From 95a58097e244fc59fe2a322f044527efac4d61c6 Mon Sep 17 00:00:00 2001 From: Keuntae Park Date: Wed, 27 Jul 2016 01:42:56 +0900 Subject: [PATCH] Hadoop InputRowParser for Orc file (#3019) * InputRowParser to decode OrcStruct from OrcNewInputFormat * add unit test for orc hadoop indexing * update docs and fix test code bug * doc updated * resove maven dependency conflict * remove unused imports * fix returning array type from Object[] to correct primitive array type * fix to support getDimension() of MapBasedRow : changing return type of orc list from array to list * rebase and updated based on comments * updated based on comments * on reflecting review comments * fix bug in typeStringFromParseSpec() and add unit test * add license header --- .../development/extensions-contrib/orc.md | 91 +++++ docs/content/development/extensions.md | 1 + .../example/hadoop_orc_job.json | 63 ++++ extensions-contrib/orc-extensions/pom.xml | 150 ++++++++ .../data/input/orc/OrcExtensionsModule.java | 46 +++ .../input/orc/OrcHadoopInputRowParser.java | 215 +++++++++++ .../io.druid.initialization.DruidModule | 1 + .../input/orc/DruidOrcInputFormatTest.java | 149 ++++++++ .../orc/OrcHadoopInputRowParserTest.java | 129 +++++++ .../input/orc/OrcIndexGeneratorJobTest.java | 356 ++++++++++++++++++ pom.xml | 2 + 11 files changed, 1203 insertions(+) create mode 100644 docs/content/development/extensions-contrib/orc.md create mode 100755 extensions-contrib/orc-extensions/example/hadoop_orc_job.json create mode 100644 extensions-contrib/orc-extensions/pom.xml create mode 100644 extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcExtensionsModule.java create mode 100644 extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcHadoopInputRowParser.java create mode 100755 extensions-contrib/orc-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule create mode 100644 extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/DruidOrcInputFormatTest.java create mode 100644 extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcHadoopInputRowParserTest.java create mode 100644 extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java diff --git a/docs/content/development/extensions-contrib/orc.md b/docs/content/development/extensions-contrib/orc.md new file mode 100644 index 00000000000..e16ca3ba279 --- /dev/null +++ b/docs/content/development/extensions-contrib/orc.md @@ -0,0 +1,91 @@ +--- +layout: doc_page +--- + +# Orc + +To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`. + +This extension enables Druid to ingest and understand the Apache Orc data format offline. + +## Orc Hadoop Parser + +This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"`. + +Field | Type | Description | Required +----------|-------------|----------------------------------------------------------------------------------------|--------- +type | String | This should say `orc` | yes +parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Any parse spec that extends ParseSpec is possible but only their TimestampSpec and DimensionsSpec are used. | yes +typeString| String | String representation of Orc struct type info. If not specified, auto constructed from parseSpec but all metric columns are dropped | no + +For example of `typeString`, string column col1 and array of string column col2 is represented by `"struct>"`. + +Currently, it only supports java primitive types and array of java primitive types, which means only 'list' of compound types in [ORC types](https://orc.apache.org/docs/types.html) is supported (list of list is not supported). + +For example of hadoop indexing: +```json +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat", + "paths": "/data/path/in/HDFS/" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "tmp/segments" + }, + "dataSchema": { + "dataSource": "no_metrics", + "parser": { + "type": "orc", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "time", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "name" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + }, + "typeString": "struct" + }, + "metricsSpec": [{ + "type": "count", + "name": "count" + }], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "ALL", + "intervals": ["2015-12-31/2016-01-02"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : {}, + "leaveIntermediate": true + } + } +} + +``` + +Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated. diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md index e95c397149d..54cc5184b9f 100644 --- a/docs/content/development/extensions.md +++ b/docs/content/development/extensions.md @@ -48,6 +48,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c |druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)| |druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)| |druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)| +|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)| |druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| diff --git a/extensions-contrib/orc-extensions/example/hadoop_orc_job.json b/extensions-contrib/orc-extensions/example/hadoop_orc_job.json new file mode 100755 index 00000000000..870f6077fc6 --- /dev/null +++ b/extensions-contrib/orc-extensions/example/hadoop_orc_job.json @@ -0,0 +1,63 @@ +{ + "type": "index_hadoop", + "spec": { + "ioConfig": { + "type": "hadoop", + "inputSpec": { + "type": "static", + "inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat", + "paths": "wikipedia.gz.orc" + }, + "metadataUpdateSpec": { + "type": "postgresql", + "connectURI": "jdbc:postgresql://localhost/druid", + "user" : "druid", + "password" : "asdf", + "segmentTable": "druid_segments" + }, + "segmentOutputPath": "/tmp/segments" + }, + "dataSchema": { + "dataSource": "wikipedia", + "parser": { + "type": "orc", + "parseSpec": { + "format": "timeAndDims", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + "col1", + "col2" + ], + "dimensionExclusions": [], + "spatialDimensions": [] + } + }, + "typeString": "struct,val1:float>" + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "DAY", + "queryGranularity": "NONE", + "intervals": ["2015-01-01/2017-01-01"] + } + }, + "tuningConfig": { + "type": "hadoop", + "workingPath": "tmp/working_path", + "partitionsSpec": { + "targetPartitionSize": 5000000 + }, + "jobProperties" : { + "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps", + "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" + }, + "leaveIntermediate": true + } + } +} diff --git a/extensions-contrib/orc-extensions/pom.xml b/extensions-contrib/orc-extensions/pom.xml new file mode 100644 index 00000000000..606c7443adc --- /dev/null +++ b/extensions-contrib/orc-extensions/pom.xml @@ -0,0 +1,150 @@ + + + + io.druid.extensions.contrib + druid-orc-extensions + druid-orc-extensions + druid-orc-extensions + + + druid + io.druid + 0.9.2-SNAPSHOT + ../../pom.xml + + 4.0.0 + + + + io.druid + druid-indexing-hadoop + ${project.parent.version} + provided + + + org.apache.hive + hive-exec + ${hive.version} + + + + org.apache.hadoop + hadoop-client + + + commons-cli + commons-cli + + + commons-httpclient + commons-httpclient + + + log4j + log4j + + + commons-codec + commons-codec + + + commons-logging + commons-logging + + + commons-io + commons-io + + + commons-lang + commons-lang + + + org.apache.httpcomponents + httpclient + + + org.apache.httpcomponents + httpcore + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.apache.zookeeper + zookeeper + + + org.slf4j + slf4j-api + + + org.slf4j + slf4j-log4j12 + + + javax.ws.rs + jsr311-api + + + com.google.code.findbugs + jsr305 + + + org.mortbay.jetty + jetty-util + + + javax.activation + activation + + + com.google.protobuf + protobuf-java + + + com.sun.jersey + jersey-core + + + + + junit + junit + test + + + org.apache.hive + hive-orc + ${hive.version} + test + + + com.google.inject + guice + + + + diff --git a/extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcExtensionsModule.java b/extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcExtensionsModule.java new file mode 100644 index 00000000000..9619c884e1f --- /dev/null +++ b/extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcExtensionsModule.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.orc; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import io.druid.initialization.DruidModule; + +import java.util.Arrays; +import java.util.List; + +public class OrcExtensionsModule implements DruidModule +{ + @Override + public List getJacksonModules() { + return Arrays.asList( + new SimpleModule("OrcInputRowParserModule") + .registerSubtypes( + new NamedType(OrcHadoopInputRowParser.class, "orc") + ) + ); + } + + @Override + public void configure(Binder binder) { + + } +} diff --git a/extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcHadoopInputRowParser.java new file mode 100644 index 00000000000..3040504a007 --- /dev/null +++ b/extensions-contrib/orc-extensions/src/main/java/io/druid/data/input/orc/OrcHadoopInputRowParser.java @@ -0,0 +1,215 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.orc; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.ParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.lang.reflect.Array; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +public class OrcHadoopInputRowParser implements InputRowParser +{ + private final ParseSpec parseSpec; + private String typeString; + private final List dimensions; + private StructObjectInspector oip; + private final OrcSerde serde; + + @JsonCreator + public OrcHadoopInputRowParser( + @JsonProperty("parseSpec") ParseSpec parseSpec, + @JsonProperty("typeString") String typeString + ) + { + this.parseSpec = parseSpec; + this.typeString = typeString; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); + this.serde = new OrcSerde(); + initialize(); + } + + @Override + public InputRow parse(OrcStruct input) + { + Map map = Maps.newHashMap(); + List fields = oip.getAllStructFieldRefs(); + for (StructField field: fields) { + ObjectInspector objectInspector = field.getFieldObjectInspector(); + switch(objectInspector.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector)objectInspector; + map.put(field.getFieldName(), + primitiveObjectInspector.getPrimitiveJavaObject(oip.getStructFieldData(input, field))); + break; + case LIST: // array case - only 1-depth array supported yet + ListObjectInspector listObjectInspector = (ListObjectInspector)objectInspector; + map.put(field.getFieldName(), + getListObject(listObjectInspector, oip.getStructFieldData(input, field))); + break; + default: + break; + } + } + + TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); + DateTime dateTime = timestampSpec.extractTimestamp(map); + + return new MapBasedInputRow(dateTime, dimensions, map); + } + + private void initialize() + { + if (typeString == null) { + typeString = typeStringFromParseSpec(parseSpec); + } + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString); + Preconditions.checkArgument(typeInfo instanceof StructTypeInfo, + String.format("typeString should be struct type but not [%s]", typeString)); + Properties table = getTablePropertiesFromStructTypeInfo((StructTypeInfo)typeInfo); + serde.initialize(new Configuration(), table); + try { + oip = (StructObjectInspector) serde.getObjectInspector(); + } catch (SerDeException e) { + e.printStackTrace(); + } + } + + private List getListObject(ListObjectInspector listObjectInspector, Object listObject) + { + List objectList = listObjectInspector.getList(listObject); + List list = null; + ObjectInspector child = listObjectInspector.getListElementObjectInspector(); + switch(child.getCategory()) { + case PRIMITIVE: + final PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector)child; + list = Lists.transform(objectList, new Function() { + @Nullable + @Override + public Object apply(@Nullable Object input) { + return primitiveObjectInspector.getPrimitiveJavaObject(input); + } + }); + break; + default: + break; + } + + return list; + } + + @Override + @JsonProperty + public ParseSpec getParseSpec() + { + return parseSpec; + } + + @JsonProperty + public String getTypeString() + { + return typeString; + } + + @Override + public InputRowParser withParseSpec(ParseSpec parseSpec) + { + return new OrcHadoopInputRowParser(parseSpec, typeString); + } + + public InputRowParser withTypeString(String typeString) + { + return new OrcHadoopInputRowParser(parseSpec, typeString); + } + + public static String typeStringFromParseSpec(ParseSpec parseSpec) + { + StringBuilder builder = new StringBuilder("struct<"); + builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string"); + if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) { + builder.append(","); + builder.append(StringUtils.join(parseSpec.getDimensionsSpec().getDimensionNames(), ":string,")).append(":string"); + } + builder.append(">"); + + return builder.toString(); + } + + public static Properties getTablePropertiesFromStructTypeInfo(StructTypeInfo structTypeInfo) + { + Properties table = new Properties(); + table.setProperty("columns", StringUtils.join(structTypeInfo.getAllStructFieldNames(), ",")); + table.setProperty("columns.types", StringUtils.join( + Lists.transform(structTypeInfo.getAllStructFieldTypeInfos(), + new Function() { + @Nullable + @Override + public String apply(@Nullable TypeInfo typeInfo) { + return typeInfo.getTypeName(); + } + }), + "," + )); + + return table; + } + + @Override + public boolean equals(Object o) + { + if (!(o instanceof OrcHadoopInputRowParser)) + return false; + + OrcHadoopInputRowParser other = (OrcHadoopInputRowParser)o; + + if (!parseSpec.equals(other.parseSpec)) + return false; + + if (!typeString.equals(other.typeString)) + return false; + + return true; + } +} diff --git a/extensions-contrib/orc-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/orc-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule new file mode 100755 index 00000000000..3cf68c5589b --- /dev/null +++ b/extensions-contrib/orc-extensions/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -0,0 +1 @@ +io.druid.data.input.orc.OrcExtensionsModule diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/DruidOrcInputFormatTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/DruidOrcInputFormatTest.java new file mode 100644 index 00000000000..be26c9d6a9d --- /dev/null +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/DruidOrcInputFormatTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.orc; + +import io.druid.data.input.MapBasedInputRow; +import io.druid.indexer.HadoopDruidIndexerConfig; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcStruct; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +public class DruidOrcInputFormatTest +{ + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + String timestamp = "2016-01-01T00:00:00.000Z"; + String col1 = "bar"; + String[] col2 = {"dat1", "dat2", "dat3"}; + double val1 = 1.1; + Job job; + HadoopDruidIndexerConfig config; + File testFile; + Path path; + FileSplit split; + + @Before + public void setUp() throws IOException + { + Configuration conf = new Configuration(); + job = Job.getInstance(conf); + + config = HadoopDruidIndexerConfig.fromFile(new File( + "example/hadoop_orc_job.json")); + + config.intoConfiguration(job); + + testFile = makeOrcFile(); + path = new Path(testFile.getAbsoluteFile().toURI()); + split = new FileSplit(path, 0, testFile.length(), null); + + } + + @Test + public void testRead() throws IOException, InterruptedException + { + InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration()); + + TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = inputFormat.createRecordReader(split, context); + OrcHadoopInputRowParser parser = (OrcHadoopInputRowParser)config.getParser(); + + reader.initialize(split, context); + + reader.nextKeyValue(); + + OrcStruct data = (OrcStruct) reader.getCurrentValue(); + + MapBasedInputRow row = (MapBasedInputRow)parser.parse(data); + + Assert.assertTrue(row.getEvent().keySet().size() == 4); + Assert.assertEquals(new DateTime(timestamp), row.getTimestamp()); + Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions()); + Assert.assertEquals(col1, row.getEvent().get("col1")); + Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2")); + + reader.close(); + } + + private File makeOrcFile() throws IOException + { + final File dir = temporaryFolder.newFolder(); + final File testOrc = new File(dir, "test.orc"); + TypeDescription schema = TypeDescription.createStruct() + .addField("timestamp", TypeDescription.createString()) + .addField("col1", TypeDescription.createString()) + .addField("col2", TypeDescription.createList(TypeDescription.createString())) + .addField("val1", TypeDescription.createFloat()); + Configuration conf = new Configuration(); + Writer writer = OrcFile.createWriter( + new Path(testOrc.getPath()), + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(100000) + .bufferSize(10000) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.CURRENT) + ); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = 1; + ((BytesColumnVector) batch.cols[0]).setRef(0, timestamp.getBytes(), 0, timestamp.length()); + ((BytesColumnVector) batch.cols[1]).setRef(0, col1.getBytes(), 0, col1.length()); + + ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2]; + listColumnVector.childCount = col2.length; + listColumnVector.lengths[0] = 3; + for (int idx = 0; idx < col2.length; idx++) + { + ((BytesColumnVector) listColumnVector.child).setRef(idx, col2[idx].getBytes(), 0, col2[idx].length()); + } + + ((DoubleColumnVector) batch.cols[3]).vector[0] = val1; + writer.addRowBatch(batch); + writer.close(); + + return testOrc; + } +} diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcHadoopInputRowParserTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcHadoopInputRowParserTest.java new file mode 100644 index 00000000000..cc473f4829b --- /dev/null +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcHadoopInputRowParserTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.data.input.orc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.name.Names; +import io.druid.data.input.impl.*; +import io.druid.guice.GuiceInjectors; +import io.druid.initialization.Initialization; +import io.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class OrcHadoopInputRowParserTest +{ + Injector injector; + ObjectMapper mapper = new DefaultObjectMapper(); + + @Before + public void setUp() + { + injector = Initialization.makeInjectorWithModules( + GuiceInjectors.makeStartupInjector(), + ImmutableList.of( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0); + } + }, + new OrcExtensionsModule() + ) + ); + mapper = injector.getInstance(ObjectMapper.class); + } + + @Test + public void testSerde() throws IOException + { + String parserString = "{\n" + + " \"type\": \"orc\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"timeAndDims\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [\n" + + " \"col1\",\n" + + " \"col2\"\n" + + " ],\n" + + " \"dimensionExclusions\": [],\n" + + " \"spatialDimensions\": []\n" + + " }\n" + + " },\n" + + " \"typeString\": \"struct,val1:float>\"\n" + + " }"; + + InputRowParser parser = mapper.readValue(parserString, InputRowParser.class); + InputRowParser expected = new OrcHadoopInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), + new DimensionsSpec( + ImmutableList.of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")), + null, + null + ) + ), + "struct,val1:float>" + ); + + Assert.assertEquals(expected, parser); + } + + @Test + public void testTypeFromParseSpec() + { + ParseSpec parseSpec = new TimeAndDimsParseSpec( + new TimestampSpec( + "timestamp", + "auto", + null + ), + new DimensionsSpec( + ImmutableList.of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")), + null, + null + ) + ); + String typeString = OrcHadoopInputRowParser.typeStringFromParseSpec(parseSpec); + String expected = "struct"; + + Assert.assertEquals(expected, typeString); + } + + +} diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java new file mode 100644 index 00000000000..1f898f93bba --- /dev/null +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -0,0 +1,356 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.data.input.orc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import com.metamx.common.Granularity; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.TimeAndDimsParseSpec; +import io.druid.data.input.impl.TimestampSpec; +import io.druid.granularity.QueryGranularities; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.HadoopTuningConfig; +import io.druid.indexer.HadoopyShardSpec; +import io.druid.indexer.IndexGeneratorJob; +import io.druid.indexer.JobHelper; +import io.druid.indexer.Jobby; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; +import io.druid.segment.QueryableIndex; +import io.druid.segment.QueryableIndexIndexableAdapter; +import io.druid.segment.Rowboat; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; +import io.druid.timeline.partition.ShardSpec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.joda.time.DateTime; +import org.joda.time.DateTimeComparator; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +public class OrcIndexGeneratorJobTest +{ + static private final AggregatorFactory[] aggs = { + new LongSumAggregatorFactory("visited_num", "visited_num"), + new HyperUniquesAggregatorFactory("unique_hosts", "host") + }; + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private ObjectMapper mapper; + private HadoopDruidIndexerConfig config; + private final String dataSourceName = "website"; + private final List data = ImmutableList.of( + "2014102200,a.example.com,100", + "2014102200,b.exmaple.com,50", + "2014102200,c.example.com,200", + "2014102200,d.example.com,250", + "2014102200,e.example.com,123", + "2014102200,f.example.com,567", + "2014102200,g.example.com,11", + "2014102200,h.example.com,251", + "2014102200,i.example.com,963", + "2014102200,j.example.com,333", + "2014102212,a.example.com,100", + "2014102212,b.exmaple.com,50", + "2014102212,c.example.com,200", + "2014102212,d.example.com,250", + "2014102212,e.example.com,123", + "2014102212,f.example.com,567", + "2014102212,g.example.com,11", + "2014102212,h.example.com,251", + "2014102212,i.example.com,963", + "2014102212,j.example.com,333" + ); + private final Interval interval = new Interval("2014-10-22T00:00:00Z/P1D"); + private File dataRoot; + private File outputRoot; + private Integer[][][] shardInfoForEachSegment = new Integer[][][]{{ + {0, 4}, + {1, 4}, + {2, 4}, + {3, 4} + }}; + private final InputRowParser inputRowParser = new OrcHadoopInputRowParser( + new TimeAndDimsParseSpec( + new TimestampSpec("timestamp", "yyyyMMddHH", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null) + ), + "struct" + ); + + private File writeDataToLocalOrcFile(File outputDir, List data) throws IOException + { + File outputFile = new File(outputDir, "test.orc"); + TypeDescription schema = TypeDescription.createStruct() + .addField("timestamp", TypeDescription.createString()) + .addField("host", TypeDescription.createString()) + .addField("visited_num", TypeDescription.createInt()); + Configuration conf = new Configuration(); + org.apache.orc.Writer writer = OrcFile.createWriter( + new Path(outputFile.getPath()), + OrcFile.writerOptions(conf) + .setSchema(schema) + .stripeSize(100000) + .bufferSize(10000) + .compress(CompressionKind.ZLIB) + .version(OrcFile.Version.CURRENT) + ); + VectorizedRowBatch batch = schema.createRowBatch(); + batch.size = data.size(); + for (int idx = 0; idx < data.size(); idx++) { + String line = data.get(idx); + String[] lineSplit = line.split(","); + ((BytesColumnVector) batch.cols[0]).setRef(idx, lineSplit[0].getBytes(), 0, lineSplit[0].length()); + ((BytesColumnVector) batch.cols[1]).setRef(idx, lineSplit[1].getBytes(), 0, lineSplit[1].length()); + ((LongColumnVector) batch.cols[2]).vector[idx] = Long.parseLong(lineSplit[2]); + } + writer.addRowBatch(batch); + writer.close(); + + return outputFile; + } + + @Before + public void setUp() throws Exception + { + mapper = HadoopDruidIndexerConfig.JSON_MAPPER; + mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed")); + + dataRoot = temporaryFolder.newFolder("data"); + outputRoot = temporaryFolder.newFolder("output"); + File dataFile = writeDataToLocalOrcFile(dataRoot, data); + + HashMap inputSpec = new HashMap(); + inputSpec.put("paths", dataFile.getCanonicalPath()); + inputSpec.put("type", "static"); + inputSpec.put("inputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"); + + config = new HadoopDruidIndexerConfig( + new HadoopIngestionSpec( + new DataSchema( + dataSourceName, + mapper.convertValue( + inputRowParser, + Map.class + ), + aggs, + new UniformGranularitySpec( + Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(this.interval) + ), + mapper + ), + new HadoopIOConfig( + ImmutableMap.copyOf(inputSpec), + null, + outputRoot.getCanonicalPath() + ), + new HadoopTuningConfig( + outputRoot.getCanonicalPath(), + null, + null, + null, + null, + null, + false, + false, + false, + false, + ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored + false, + true, + null, + true, + null + ) + ) + ); + config.setShardSpecs( + loadShardSpecs(shardInfoForEachSegment) + ); + config = HadoopDruidIndexerConfig.fromSpec(config.getSchema()); + } + + @Test + public void testIndexGeneratorJob() throws IOException + { + verifyJob(new IndexGeneratorJob(config)); + } + + private void verifyJob(IndexGeneratorJob job) throws IOException + { + JobHelper.runJobs(ImmutableList.of(job), config); + + int segmentNum = 0; + for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) { + Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++]; + File segmentOutputFolder = new File( + String.format( + "%s/%s/%s_%s/%s", + config.getSchema().getIOConfig().getSegmentOutputPath(), + config.getSchema().getDataSchema().getDataSource(), + currTime.toString(), + currTime.plusDays(1).toString(), + config.getSchema().getTuningConfig().getVersion() + ) + ); + Assert.assertTrue(segmentOutputFolder.exists()); + Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length); + + int rowCount = 0; + for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) { + File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum)); + Assert.assertTrue(individualSegmentFolder.exists()); + + File descriptor = new File(individualSegmentFolder, "descriptor.json"); + File indexZip = new File(individualSegmentFolder, "index.zip"); + Assert.assertTrue(descriptor.exists()); + Assert.assertTrue(indexZip.exists()); + + DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class); + Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion()); + Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval()); + Assert.assertEquals("local", dataSegment.getLoadSpec().get("type")); + Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path")); + Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion()); + + Assert.assertEquals(dataSourceName, dataSegment.getDataSource()); + Assert.assertTrue(dataSegment.getDimensions().size() == 1); + String[] dimensions = dataSegment.getDimensions().toArray(new String[dataSegment.getDimensions().size()]); + Arrays.sort(dimensions); + Assert.assertEquals("host", dimensions[0]); + Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0)); + Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1)); + + Integer[] hashShardInfo = shardInfo[partitionNum]; + HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec(); + Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum()); + Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions()); + + File dir = Files.createTempDir(); + + unzip(indexZip, dir); + + QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir); + QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index); + + for(Rowboat row: adapter.getRows()) + { + Object[] metrics = row.getMetrics(); + + rowCount++; + Assert.assertTrue(metrics.length == 2); + } + } + Assert.assertEquals(rowCount, data.size()); + } + } + + private Map> loadShardSpecs( + Integer[][][] shardInfoForEachShard + ) + { + Map> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance()); + int shardCount = 0; + int segmentNum = 0; + for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) { + List specs = Lists.newArrayList(); + for (Integer[] shardInfo : shardInfoForEachShard[segmentNum++]) { + specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER)); + } + List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size()); + for (ShardSpec spec : specs) { + actualSpecs.add(new HadoopyShardSpec(spec, shardCount++)); + } + + shardSpecs.put(segmentGranularity.getStart(), actualSpecs); + } + + return shardSpecs; + } + + private void unzip(File zip, File outDir) + { + try { + long size = 0L; + final byte[] buffer = new byte[1 << 13]; + try (ZipInputStream in = new ZipInputStream(new FileInputStream(zip))) { + for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) { + final String fileName = entry.getName(); + try (final OutputStream out = new BufferedOutputStream( + new FileOutputStream( + outDir.getAbsolutePath() + + File.separator + + fileName + ), 1 << 13 + )) { + for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) { + if (len == 0) { + continue; + } + size += len; + out.write(buffer, 0, len); + } + out.flush(); + } + } + } + } + catch (IOException | RuntimeException exception) { + } + } +} diff --git a/pom.xml b/pom.xml index e01684a45cf..9654679a025 100644 --- a/pom.xml +++ b/pom.xml @@ -69,6 +69,7 @@ 1.7.12 2.3.0 + 2.0.0 @@ -106,6 +107,7 @@ extensions-contrib/distinctcount extensions-contrib/parquet-extensions extensions-contrib/statsd-emitter + extensions-contrib/orc-extensions