Hadoop InputRowParser for Orc file (#3019)

* InputRowParser to decode OrcStruct from OrcNewInputFormat

* add unit test for orc hadoop indexing

* update docs and fix test code bug

* doc updated

* resove maven dependency conflict

* remove unused imports

* fix returning array type from Object[] to correct primitive array type

* fix to support getDimension() of MapBasedRow : changing return type of orc list from array to list

* rebase and updated based on comments

* updated based on comments

* on reflecting review comments

* fix bug in typeStringFromParseSpec() and add unit test

* add license header
This commit is contained in:
Keuntae Park 2016-07-27 01:42:56 +09:00 committed by Fangjin Yang
parent 76fabcfdb2
commit 95a58097e2
11 changed files with 1203 additions and 0 deletions

View File

@ -0,0 +1,91 @@
---
layout: doc_page
---
# Orc
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`.
This extension enables Druid to ingest and understand the Apache Orc data format offline.
## Orc Hadoop Parser
This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"`.
Field | Type | Description | Required
----------|-------------|----------------------------------------------------------------------------------------|---------
type | String | This should say `orc` | yes
parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Any parse spec that extends ParseSpec is possible but only their TimestampSpec and DimensionsSpec are used. | yes
typeString| String | String representation of Orc struct type info. If not specified, auto constructed from parseSpec but all metric columns are dropped | no
For example of `typeString`, string column col1 and array of string column col2 is represented by `"struct<col1:string,col2:array<string>>"`.
Currently, it only supports java primitive types and array of java primitive types, which means only 'list' of compound types in [ORC types](https://orc.apache.org/docs/types.html) is supported (list of list is not supported).
For example of hadoop indexing:
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
"paths": "/data/path/in/HDFS/"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "tmp/segments"
},
"dataSchema": {
"dataSource": "no_metrics",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"name"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"typeString": "struct<time:string,name:string>"
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "ALL",
"intervals": ["2015-12-31/2016-01-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {},
"leaveIntermediate": true
}
}
}
```
Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.

View File

@ -48,6 +48,7 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer).|[link](../development/extensions-contrib/kafka-simple.html)|
|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-contrib/parquet.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|

View File

@ -0,0 +1,63 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
"paths": "wikipedia.gz.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"col1",
"col2"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"typeString": "struct<timestamp:string,col1:string,col2:array<string>,val1:float>"
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -0,0 +1,150 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Druid - a distributed column store.
~ Copyright 2012 - 2015 Metamarkets Group Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-orc-extensions</artifactId>
<name>druid-orc-extensions</name>
<description>druid-orc-extensions</description>
<parent>
<artifactId>druid</artifactId>
<groupId>io.druid</groupId>
<version>0.9.2-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-orc</artifactId>
<version>${hive.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.orc;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.Arrays;
import java.util.List;
public class OrcExtensionsModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules() {
return Arrays.asList(
new SimpleModule("OrcInputRowParserModule")
.registerSubtypes(
new NamedType(OrcHadoopInputRowParser.class, "orc")
)
);
}
@Override
public void configure(Binder binder) {
}
}

View File

@ -0,0 +1,215 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.orc;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.ParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.lang.reflect.Array;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
{
private final ParseSpec parseSpec;
private String typeString;
private final List<String> dimensions;
private StructObjectInspector oip;
private final OrcSerde serde;
@JsonCreator
public OrcHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("typeString") String typeString
)
{
this.parseSpec = parseSpec;
this.typeString = typeString;
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.serde = new OrcSerde();
initialize();
}
@Override
public InputRow parse(OrcStruct input)
{
Map<String, Object> map = Maps.newHashMap();
List<? extends StructField> fields = oip.getAllStructFieldRefs();
for (StructField field: fields) {
ObjectInspector objectInspector = field.getFieldObjectInspector();
switch(objectInspector.getCategory()) {
case PRIMITIVE:
PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector)objectInspector;
map.put(field.getFieldName(),
primitiveObjectInspector.getPrimitiveJavaObject(oip.getStructFieldData(input, field)));
break;
case LIST: // array case - only 1-depth array supported yet
ListObjectInspector listObjectInspector = (ListObjectInspector)objectInspector;
map.put(field.getFieldName(),
getListObject(listObjectInspector, oip.getStructFieldData(input, field)));
break;
default:
break;
}
}
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(map);
return new MapBasedInputRow(dateTime, dimensions, map);
}
private void initialize()
{
if (typeString == null) {
typeString = typeStringFromParseSpec(parseSpec);
}
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
Preconditions.checkArgument(typeInfo instanceof StructTypeInfo,
String.format("typeString should be struct type but not [%s]", typeString));
Properties table = getTablePropertiesFromStructTypeInfo((StructTypeInfo)typeInfo);
serde.initialize(new Configuration(), table);
try {
oip = (StructObjectInspector) serde.getObjectInspector();
} catch (SerDeException e) {
e.printStackTrace();
}
}
private List getListObject(ListObjectInspector listObjectInspector, Object listObject)
{
List objectList = listObjectInspector.getList(listObject);
List list = null;
ObjectInspector child = listObjectInspector.getListElementObjectInspector();
switch(child.getCategory()) {
case PRIMITIVE:
final PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector)child;
list = Lists.transform(objectList, new Function() {
@Nullable
@Override
public Object apply(@Nullable Object input) {
return primitiveObjectInspector.getPrimitiveJavaObject(input);
}
});
break;
default:
break;
}
return list;
}
@Override
@JsonProperty
public ParseSpec getParseSpec()
{
return parseSpec;
}
@JsonProperty
public String getTypeString()
{
return typeString;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new OrcHadoopInputRowParser(parseSpec, typeString);
}
public InputRowParser withTypeString(String typeString)
{
return new OrcHadoopInputRowParser(parseSpec, typeString);
}
public static String typeStringFromParseSpec(ParseSpec parseSpec)
{
StringBuilder builder = new StringBuilder("struct<");
builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string");
if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) {
builder.append(",");
builder.append(StringUtils.join(parseSpec.getDimensionsSpec().getDimensionNames(), ":string,")).append(":string");
}
builder.append(">");
return builder.toString();
}
public static Properties getTablePropertiesFromStructTypeInfo(StructTypeInfo structTypeInfo)
{
Properties table = new Properties();
table.setProperty("columns", StringUtils.join(structTypeInfo.getAllStructFieldNames(), ","));
table.setProperty("columns.types", StringUtils.join(
Lists.transform(structTypeInfo.getAllStructFieldTypeInfos(),
new Function<TypeInfo, String>() {
@Nullable
@Override
public String apply(@Nullable TypeInfo typeInfo) {
return typeInfo.getTypeName();
}
}),
","
));
return table;
}
@Override
public boolean equals(Object o)
{
if (!(o instanceof OrcHadoopInputRowParser))
return false;
OrcHadoopInputRowParser other = (OrcHadoopInputRowParser)o;
if (!parseSpec.equals(other.parseSpec))
return false;
if (!typeString.equals(other.typeString))
return false;
return true;
}
}

View File

@ -0,0 +1 @@
io.druid.data.input.orc.OrcExtensionsModule

View File

@ -0,0 +1,149 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.orc;
import io.druid.data.input.MapBasedInputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
public class DruidOrcInputFormatTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
String timestamp = "2016-01-01T00:00:00.000Z";
String col1 = "bar";
String[] col2 = {"dat1", "dat2", "dat3"};
double val1 = 1.1;
Job job;
HadoopDruidIndexerConfig config;
File testFile;
Path path;
FileSplit split;
@Before
public void setUp() throws IOException
{
Configuration conf = new Configuration();
job = Job.getInstance(conf);
config = HadoopDruidIndexerConfig.fromFile(new File(
"example/hadoop_orc_job.json"));
config.intoConfiguration(job);
testFile = makeOrcFile();
path = new Path(testFile.getAbsoluteFile().toURI());
split = new FileSplit(path, 0, testFile.length(), null);
}
@Test
public void testRead() throws IOException, InterruptedException
{
InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
OrcHadoopInputRowParser parser = (OrcHadoopInputRowParser)config.getParser();
reader.initialize(split, context);
reader.nextKeyValue();
OrcStruct data = (OrcStruct) reader.getCurrentValue();
MapBasedInputRow row = (MapBasedInputRow)parser.parse(data);
Assert.assertTrue(row.getEvent().keySet().size() == 4);
Assert.assertEquals(new DateTime(timestamp), row.getTimestamp());
Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
Assert.assertEquals(col1, row.getEvent().get("col1"));
Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
reader.close();
}
private File makeOrcFile() throws IOException
{
final File dir = temporaryFolder.newFolder();
final File testOrc = new File(dir, "test.orc");
TypeDescription schema = TypeDescription.createStruct()
.addField("timestamp", TypeDescription.createString())
.addField("col1", TypeDescription.createString())
.addField("col2", TypeDescription.createList(TypeDescription.createString()))
.addField("val1", TypeDescription.createFloat());
Configuration conf = new Configuration();
Writer writer = OrcFile.createWriter(
new Path(testOrc.getPath()),
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
.compress(CompressionKind.ZLIB)
.version(OrcFile.Version.CURRENT)
);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1;
((BytesColumnVector) batch.cols[0]).setRef(0, timestamp.getBytes(), 0, timestamp.length());
((BytesColumnVector) batch.cols[1]).setRef(0, col1.getBytes(), 0, col1.length());
ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
listColumnVector.childCount = col2.length;
listColumnVector.lengths[0] = 3;
for (int idx = 0; idx < col2.length; idx++)
{
((BytesColumnVector) listColumnVector.child).setRef(idx, col2[idx].getBytes(), 0, col2[idx].length());
}
((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
writer.addRowBatch(batch);
writer.close();
return testOrc;
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.orc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import io.druid.data.input.impl.*;
import io.druid.guice.GuiceInjectors;
import io.druid.initialization.Initialization;
import io.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
public class OrcHadoopInputRowParserTest
{
Injector injector;
ObjectMapper mapper = new DefaultObjectMapper();
@Before
public void setUp()
{
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
}
},
new OrcExtensionsModule()
)
);
mapper = injector.getInstance(ObjectMapper.class);
}
@Test
public void testSerde() throws IOException
{
String parserString = "{\n" +
" \"type\": \"orc\",\n" +
" \"parseSpec\": {\n" +
" \"format\": \"timeAndDims\",\n" +
" \"timestampSpec\": {\n" +
" \"column\": \"timestamp\",\n" +
" \"format\": \"auto\"\n" +
" },\n" +
" \"dimensionsSpec\": {\n" +
" \"dimensions\": [\n" +
" \"col1\",\n" +
" \"col2\"\n" +
" ],\n" +
" \"dimensionExclusions\": [],\n" +
" \"spatialDimensions\": []\n" +
" }\n" +
" },\n" +
" \"typeString\": \"struct<timestamp:string,col1:string,col2:array<string>,val1:float>\"\n" +
" }";
InputRowParser parser = mapper.readValue(parserString, InputRowParser.class);
InputRowParser expected = new OrcHadoopInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
ImmutableList.<DimensionSchema>of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")),
null,
null
)
),
"struct<timestamp:string,col1:string,col2:array<string>,val1:float>"
);
Assert.assertEquals(expected, parser);
}
@Test
public void testTypeFromParseSpec()
{
ParseSpec parseSpec = new TimeAndDimsParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
ImmutableList.<DimensionSchema>of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")),
null,
null
)
);
String typeString = OrcHadoopInputRowParser.typeStringFromParseSpec(parseSpec);
String expected = "struct<timestamp:string,col1:string,col2:string>";
Assert.assertEquals(expected, typeString);
}
}

View File

@ -0,0 +1,356 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.data.input.orc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.metamx.common.Granularity;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.TimeAndDimsParseSpec;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularities;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.indexer.HadoopyShardSpec;
import io.druid.indexer.IndexGeneratorJob;
import io.druid.indexer.JobHelper;
import io.druid.indexer.Jobby;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexIndexableAdapter;
import io.druid.segment.Rowboat;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class OrcIndexGeneratorJobTest
{
static private final AggregatorFactory[] aggs = {
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
};
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private ObjectMapper mapper;
private HadoopDruidIndexerConfig config;
private final String dataSourceName = "website";
private final List<String> data = ImmutableList.of(
"2014102200,a.example.com,100",
"2014102200,b.exmaple.com,50",
"2014102200,c.example.com,200",
"2014102200,d.example.com,250",
"2014102200,e.example.com,123",
"2014102200,f.example.com,567",
"2014102200,g.example.com,11",
"2014102200,h.example.com,251",
"2014102200,i.example.com,963",
"2014102200,j.example.com,333",
"2014102212,a.example.com,100",
"2014102212,b.exmaple.com,50",
"2014102212,c.example.com,200",
"2014102212,d.example.com,250",
"2014102212,e.example.com,123",
"2014102212,f.example.com,567",
"2014102212,g.example.com,11",
"2014102212,h.example.com,251",
"2014102212,i.example.com,963",
"2014102212,j.example.com,333"
);
private final Interval interval = new Interval("2014-10-22T00:00:00Z/P1D");
private File dataRoot;
private File outputRoot;
private Integer[][][] shardInfoForEachSegment = new Integer[][][]{{
{0, 4},
{1, 4},
{2, 4},
{3, 4}
}};
private final InputRowParser inputRowParser = new OrcHadoopInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null)
),
"struct<timestamp:string,host:string,visited_num:int>"
);
private File writeDataToLocalOrcFile(File outputDir, List<String> data) throws IOException
{
File outputFile = new File(outputDir, "test.orc");
TypeDescription schema = TypeDescription.createStruct()
.addField("timestamp", TypeDescription.createString())
.addField("host", TypeDescription.createString())
.addField("visited_num", TypeDescription.createInt());
Configuration conf = new Configuration();
org.apache.orc.Writer writer = OrcFile.createWriter(
new Path(outputFile.getPath()),
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
.compress(CompressionKind.ZLIB)
.version(OrcFile.Version.CURRENT)
);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = data.size();
for (int idx = 0; idx < data.size(); idx++) {
String line = data.get(idx);
String[] lineSplit = line.split(",");
((BytesColumnVector) batch.cols[0]).setRef(idx, lineSplit[0].getBytes(), 0, lineSplit[0].length());
((BytesColumnVector) batch.cols[1]).setRef(idx, lineSplit[1].getBytes(), 0, lineSplit[1].length());
((LongColumnVector) batch.cols[2]).vector[idx] = Long.parseLong(lineSplit[2]);
}
writer.addRowBatch(batch);
writer.close();
return outputFile;
}
@Before
public void setUp() throws Exception
{
mapper = HadoopDruidIndexerConfig.JSON_MAPPER;
mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
dataRoot = temporaryFolder.newFolder("data");
outputRoot = temporaryFolder.newFolder("output");
File dataFile = writeDataToLocalOrcFile(dataRoot, data);
HashMap<String, Object> inputSpec = new HashMap<String, Object>();
inputSpec.put("paths", dataFile.getCanonicalPath());
inputSpec.put("type", "static");
inputSpec.put("inputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat");
config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
dataSourceName,
mapper.convertValue(
inputRowParser,
Map.class
),
aggs,
new UniformGranularitySpec(
Granularity.DAY, QueryGranularities.NONE, ImmutableList.of(this.interval)
),
mapper
),
new HadoopIOConfig(
ImmutableMap.copyOf(inputSpec),
null,
outputRoot.getCanonicalPath()
),
new HadoopTuningConfig(
outputRoot.getCanonicalPath(),
null,
null,
null,
null,
null,
false,
false,
false,
false,
ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
false,
true,
null,
true,
null
)
)
);
config.setShardSpecs(
loadShardSpecs(shardInfoForEachSegment)
);
config = HadoopDruidIndexerConfig.fromSpec(config.getSchema());
}
@Test
public void testIndexGeneratorJob() throws IOException
{
verifyJob(new IndexGeneratorJob(config));
}
private void verifyJob(IndexGeneratorJob job) throws IOException
{
JobHelper.runJobs(ImmutableList.<Jobby>of(job), config);
int segmentNum = 0;
for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) {
Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
File segmentOutputFolder = new File(
String.format(
"%s/%s/%s_%s/%s",
config.getSchema().getIOConfig().getSegmentOutputPath(),
config.getSchema().getDataSchema().getDataSource(),
currTime.toString(),
currTime.plusDays(1).toString(),
config.getSchema().getTuningConfig().getVersion()
)
);
Assert.assertTrue(segmentOutputFolder.exists());
Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length);
int rowCount = 0;
for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) {
File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum));
Assert.assertTrue(individualSegmentFolder.exists());
File descriptor = new File(individualSegmentFolder, "descriptor.json");
File indexZip = new File(individualSegmentFolder, "index.zip");
Assert.assertTrue(descriptor.exists());
Assert.assertTrue(indexZip.exists());
DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
Assert.assertEquals(dataSourceName, dataSegment.getDataSource());
Assert.assertTrue(dataSegment.getDimensions().size() == 1);
String[] dimensions = dataSegment.getDimensions().toArray(new String[dataSegment.getDimensions().size()]);
Arrays.sort(dimensions);
Assert.assertEquals("host", dimensions[0]);
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Integer[] hashShardInfo = shardInfo[partitionNum];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions());
File dir = Files.createTempDir();
unzip(indexZip, dir);
QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir);
QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
for(Rowboat row: adapter.getRows())
{
Object[] metrics = row.getMetrics();
rowCount++;
Assert.assertTrue(metrics.length == 2);
}
}
Assert.assertEquals(rowCount, data.size());
}
}
private Map<DateTime, List<HadoopyShardSpec>> loadShardSpecs(
Integer[][][] shardInfoForEachShard
)
{
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
int segmentNum = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
List<ShardSpec> specs = Lists.newArrayList();
for (Integer[] shardInfo : shardInfoForEachShard[segmentNum++]) {
specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER));
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (ShardSpec spec : specs) {
actualSpecs.add(new HadoopyShardSpec(spec, shardCount++));
}
shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
}
return shardSpecs;
}
private void unzip(File zip, File outDir)
{
try {
long size = 0L;
final byte[] buffer = new byte[1 << 13];
try (ZipInputStream in = new ZipInputStream(new FileInputStream(zip))) {
for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
final String fileName = entry.getName();
try (final OutputStream out = new BufferedOutputStream(
new FileOutputStream(
outDir.getAbsolutePath()
+ File.separator
+ fileName
), 1 << 13
)) {
for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
if (len == 0) {
continue;
}
size += len;
out.write(buffer, 0, len);
}
out.flush();
}
}
}
}
catch (IOException | RuntimeException exception) {
}
}
}

View File

@ -69,6 +69,7 @@
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.3.0</hadoop.compile.version>
<hive.version>2.0.0</hive.version>
</properties>
<modules>
@ -106,6 +107,7 @@
<module>extensions-contrib/distinctcount</module>
<module>extensions-contrib/parquet-extensions</module>
<module>extensions-contrib/statsd-emitter</module>
<module>extensions-contrib/orc-extensions</module>
</modules>
<dependencyManagement>