'core' ORC extension (#7138)

* orc extension reworked to use apache orc map-reduce lib, moved to core extensions, support for flattenSpec, tests, docs

* change binary handling to be compatible with avro and parquet, Rows.objectToStrings now converts byte[] to base64, change date handling

* better docs and tests

* fix it

* formatting

* doc fix

* fix it

* exclude redundant dependencies

* use latest orc-mapreduce, add hadoop jobProperties recommendations to docs

* doc fix

* review stuff and fix binaryAsString

* cache for root level fields

* more better
This commit is contained in:
Clint Wylie 2019-04-09 09:03:26 -07:00 committed by Gian Merlino
parent 09abc4a856
commit 89bb43f382
33 changed files with 1755 additions and 1596 deletions

View File

@ -66,6 +66,9 @@ public final class Rows
} else if (inputValue instanceof List) {
// guava's toString function fails on null objects, so please do not use it
return ((List<?>) inputValue).stream().map(String::valueOf).collect(Collectors.toList());
} else if (inputValue instanceof byte[]) {
// convert byte[] to base64 encoded string
return Collections.singletonList(StringUtils.encodeBase64String((byte[]) inputValue));
} else {
return Collections.singletonList(String.valueOf(inputValue));
}

View File

@ -170,6 +170,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions:mysql-metadata-storage</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-orc-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-parquet-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:postgresql-metadata-storage</argument>
@ -318,8 +320,6 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-opentsdb-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-orc-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-rabbitmq</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-redis-cache</argument>

View File

@ -1,113 +0,0 @@
---
layout: doc_page
title: "ORC"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# ORC
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`.
This extension enables Druid to ingest and understand the Apache ORC data format offline.
## ORC Hadoop Parser
This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"`.
|Field | Type | Description | Required|
|----------|-------------|----------------------------------------------------------------------------------------|---------|
|type | String | This should say `orc` | yes|
|parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Any parse spec that extends ParseSpec is possible but only their TimestampSpec and DimensionsSpec are used. | yes|
|typeString| String | String representation of ORC struct type info. If not specified, auto constructed from parseSpec but all metric columns are dropped | no|
|mapFieldNameFormat| String | String format for resolving the flatten map fields. Default is `<PARENT>_<CHILD>`. | no |
For example of `typeString`, string column col1 and array of string column col2 is represented by `"struct<col1:string,col2:array<string>>"`.
Currently, it only supports java primitive types, array of java primitive types and map of java primitive types. Thus, compound types 'list' and 'map' in [ORC types](https://orc.apache.org/docs/types.html) are supported. Note that, list of list is not supported, nor map of compound types. For map types, values will be exploded to several columns where column names will be resolved via `mapFieldNameFormat`.
For example of hadoop indexing:
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
"paths": "/data/path/in/HDFS/"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "tmp/segments"
},
"dataSchema": {
"dataSource": "no_metrics",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"name"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"typeString": "struct<time:string,name:string>",
"mapFieldNameFormat": "<PARENT>_<CHILD>"
},
"metricsSpec": [{
"type": "count",
"name": "count"
}],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "ALL",
"intervals": ["2015-12-31/2016-01-02"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {},
"leaveIntermediate": true
}
}
}
```
Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.

View File

@ -0,0 +1,311 @@
---
layout: doc_page
title: "Druid ORC Extension"
---
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
# Druid ORC Extension
This module extends [Druid Hadoop based indexing](../../ingestion/hadoop.html) to ingest data directly from offline
Apache ORC files.
To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`.
## ORC Hadoop Parser
The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.orc.mapreduce.OrcInputFormat"`.
|Field | Type | Description | Required|
|----------|-------------|----------------------------------------------------------------------------------------|---------|
|type | String | This should say `orc` | yes|
|parseSpec | JSON Object | Specifies the timestamp and dimensions of the data (`timeAndDims` and `orc` format) and a `flattenSpec` (`orc` format) | yes|
The parser supports two `parseSpec` formats: `orc` and `timeAndDims`.
`orc` supports auto field discovery and flattening, if specified with a [flattenSpec](../../ingestion/flatten-json.html).
If no `flattenSpec` is specified, `useFieldDiscovery` will be enabled by default. Specifying a `dimensionSpec` is
optional if `useFieldDiscovery` is enabled: if a `dimensionSpec` is supplied, the list of `dimensions` it defines will be
the set of ingested dimensions, if missing the discovered fields will make up the list.
`timeAndDims` parse spec must specify which fields will be extracted as dimensions through the `dimensionSpec`.
[All column types](https://orc.apache.org/docs/types.html) are supported, with the exception of `union` types. Columns of
`list` type, if filled with primitives, may be used as a multi-value dimension, or specific elements can be extracted with
`flattenSpec` expressions. Likewise, primitive fields may be extracted from `map` and `struct` types in the same manner.
Auto field discovery will automatically create a string dimension for every (non-timestamp) primitive or `list` of
primitives, as well as any flatten expressions defined in the `flattenSpec`.
### Hadoop Job Properties
Like most Hadoop jobs, the best outcomes will add `"mapreduce.job.user.classpath.first": "true"` or
`"mapreduce.job.classloader": "true"` to the `jobProperties` section of `tuningConfig`. Note that it is likely if using
`"mapreduce.job.classloader": "true"` that you will need to set `mapreduce.job.classloader.system.classes` to include
`-org.apache.hadoop.hive.` to instruct Hadoop to load `org.apache.hadoop.hive` classes from the application jars instead
of system jars, e.g.
```json
...
"mapreduce.job.classloader": "true",
"mapreduce.job.classloader.system.classes" : "java., javax.accessibility., javax.activation., javax.activity., javax.annotation., javax.annotation.processing., javax.crypto., javax.imageio., javax.jws., javax.lang.model., -javax.management.j2ee., javax.management., javax.naming., javax.net., javax.print., javax.rmi., javax.script., -javax.security.auth.message., javax.security.auth., javax.security.cert., javax.security.sasl., javax.sound., javax.sql., javax.swing., javax.tools., javax.transaction., -javax.xml.registry., -javax.xml.rpc., javax.xml., org.w3c.dom., org.xml.sax., org.apache.commons.logging., org.apache.log4j., -org.apache.hadoop.hbase., -org.apache.hadoop.hive., org.apache.hadoop., core-default.xml, hdfs-default.xml, mapred-default.xml, yarn-default.xml",
...
```
This is due to the `hive-storage-api` dependency of the
`orc-mapreduce` library, which provides some classes under the `org.apache.hadoop.hive` package. If instead using the
setting `"mapreduce.job.user.classpath.first": "true"`, then this will not be an issue.
### Examples
#### `orc` parser, `orc` parseSpec, auto field discovery, flatten expressions
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "path/to/file.orc"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nestedDim",
"expr": "$.nestedData.dim1"
},
{
"type": "path",
"name": "listDimFirstItem",
"expr": "$.listDim[1]"
}
]
},
"timestampSpec": {
"column": "timestamp",
"format": "millis"
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
}
```
#### `orc` parser, `orc` parseSpec, field discovery with no flattenSpec or dimensionSpec
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "path/to/file.orc"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"timestampSpec": {
"column": "timestamp",
"format": "millis"
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
}
```
#### `orc` parser, `orc` parseSpec, no autodiscovery
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "path/to/file.orc"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": false,
"fields": [
{
"type": "path",
"name": "nestedDim",
"expr": "$.nestedData.dim1"
},
{
"type": "path",
"name": "listDimFirstItem",
"expr": "$.listDim[1]"
}
]
},
"timestampSpec": {
"column": "timestamp",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
"dim1",
"dim3",
"nestedDim",
"listDimFirstItem"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
}
```
#### `orc` parser, `timeAndDims` parseSpec
```json
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "path/to/file.orc"
},
...
},
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"dim1",
"dim2",
"dim3",
"listDim"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
...
},
"tuningConfig": <hadoop-tuning-config>
}
}
```
### Migration from 'contrib' extension
This extension, first available in version 0.15.0, replaces the previous 'contrib' extension which was available until
0.14.0-incubating. While this extension can index any data the 'contrib' extension could, the json spec for the
ingestion task is *incompatible*, and will need modified to work with the newer 'core' extension.
To migrate to 0.15.0+:
* In `inputSpec` of `ioConfig`, `inputFormat` must be changed from `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"` to
`"org.apache.orc.mapreduce.OrcInputFormat"`
* The 'contrib' extension supported a `typeString` property, which provided the schema of the
ORC file, of which was essentially required to have the types correct, but notably _not_ the column names, which
facilitated column renaming. In the 'core' extension, column renaming can be achieved with
[`flattenSpec` expressions](../../ingestion/flatten-json.html). For example, `"typeString":"struct<time:string,name:string>"`
with the actual schema `struct<_col0:string,_col1:string>`, to preserve Druid schema would need replaced with:
```json
"flattenSpec": {
"fields": [
{
"type": "path",
"name": "time",
"expr": "$._col0"
},
{
"type": "path",
"name": "name",
"expr": "$._col1"
}
]
...
}
```
* The 'contrib' extension supported a `mapFieldNameFormat` property, which provided a way to specify a dimension to
flatten `OrcMap` columns with primitive types. This functionality has also been replaced with
[`flattenSpec` expressions](../../ingestion/flatten-json.html). For example: `"mapFieldNameFormat": "<PARENT>_<CHILD>"`
for a dimension `nestedData_dim1`, to preserve Druid schema could be replaced with
```json
"flattenSpec": {
"fields": [
{
"type": "path",
"name": "nestedData_dim1",
"expr": "$.nestedData.dim1"
}
]
...
}
```

View File

@ -55,6 +55,7 @@ Core extensions are maintained by Druid committers.
|druid-kerberos|Kerberos authentication for druid processes.|[link](../development/extensions-core/druid-kerberos.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-core/orc.html)|
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-core/parquet.html)|
|druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
@ -83,7 +84,6 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)|
|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|

View File

@ -1,318 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.druid.extensions.contrib</groupId>
<artifactId>druid-orc-extensions</artifactId>
<name>druid-orc-extensions</name>
<description>druid-orc-extensions</description>
<parent>
<artifactId>druid</artifactId>
<groupId>org.apache.druid</groupId>
<version>0.15.0-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.compile.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-vector-code-gen</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service-rpc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-llap-tez</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-spark-client</artifactId>
</exclusion>
<exclusion>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>javolution</groupId>
<artifactId>javolution</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>antlr-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ant</groupId>
<artifactId>ant</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-registry</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>orc-tools</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.ivy</groupId>
<artifactId>ivy</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>apache-curator</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.jodd</groupId>
<artifactId>jodd-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-druid</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-avatica</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.googlecode.javaewah</groupId>
<artifactId>JavaEWAH</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>com.tdunning</groupId>
<artifactId>json</artifactId>
</exclusion>
<exclusion>
<groupId>stax</groupId>
<artifactId>stax-api</artifactId>
</exclusion>
<exclusion>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-standalone-metastore-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>oro</groupId>
<artifactId>oro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</exclusion>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,316 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
{
static final String MAP_CHILD_TAG = "<CHILD>";
static final String MAP_PARENT_TAG = "<PARENT>";
static final String DEFAULT_MAP_FIELD_NAME_FORMAT = MAP_PARENT_TAG + "_" + MAP_CHILD_TAG;
private final ParseSpec parseSpec;
private final String typeString;
private final String mapFieldNameFormat;
private final String mapParentFieldNameFormat;
private final List<String> dimensions;
private final StructObjectInspector oip;
@JsonCreator
public OrcHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@JsonProperty("typeString") String typeString,
@JsonProperty("mapFieldNameFormat") String mapFieldNameFormat
)
{
this.parseSpec = parseSpec;
this.typeString = typeString == null ? typeStringFromParseSpec(parseSpec) : typeString;
this.mapFieldNameFormat =
mapFieldNameFormat == null ||
!mapFieldNameFormat.contains(MAP_PARENT_TAG) ||
!mapFieldNameFormat.contains(MAP_CHILD_TAG) ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat;
this.mapParentFieldNameFormat = StringUtils.replace(this.mapFieldNameFormat, MAP_PARENT_TAG, "%s");
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.oip = makeObjectInspector(this.typeString);
}
@SuppressWarnings("ArgumentParameterSwap")
@Override
public List<InputRow> parseBatch(OrcStruct input)
{
Map<String, Object> map = new HashMap<>();
List<? extends StructField> fields = oip.getAllStructFieldRefs();
for (StructField field : fields) {
ObjectInspector objectInspector = field.getFieldObjectInspector();
switch (objectInspector.getCategory()) {
case PRIMITIVE:
PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector) objectInspector;
map.put(
field.getFieldName(),
coercePrimitiveObject(
primitiveObjectInspector,
oip.getStructFieldData(input, field)
)
);
break;
case LIST: // array case - only 1-depth array supported yet
ListObjectInspector listObjectInspector = (ListObjectInspector) objectInspector;
map.put(
field.getFieldName(),
getListObject(listObjectInspector, oip.getStructFieldData(input, field))
);
break;
case MAP:
MapObjectInspector mapObjectInspector = (MapObjectInspector) objectInspector;
getMapObject(field.getFieldName(), mapObjectInspector, oip.getStructFieldData(input, field), map);
break;
default:
break;
}
}
TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(map);
final List<String> dimensions;
if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions;
} else {
dimensions = Lists.newArrayList(
Sets.difference(map.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
);
}
return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map));
}
private List getListObject(ListObjectInspector listObjectInspector, Object listObject)
{
if (listObjectInspector.getListLength(listObject) < 0) {
return null;
}
List<?> objectList = listObjectInspector.getList(listObject);
List<?> list = null;
ObjectInspector child = listObjectInspector.getListElementObjectInspector();
switch (child.getCategory()) {
case PRIMITIVE:
final PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector) child;
list = objectList.stream()
.map(input -> coercePrimitiveObject(primitiveObjectInspector, input))
.collect(Collectors.toList());
break;
default:
break;
}
return list;
}
private void getMapObject(String parentName, MapObjectInspector mapObjectInspector, Object mapObject, Map<String, Object> parsedMap)
{
if (mapObjectInspector.getMapSize(mapObject) < 0) {
return;
}
String mapChildFieldNameFormat = StringUtils.replace(
StringUtils.format(mapParentFieldNameFormat, parentName),
MAP_CHILD_TAG,
"%s"
);
Map objectMap = mapObjectInspector.getMap(mapObject);
PrimitiveObjectInspector key = (PrimitiveObjectInspector) mapObjectInspector.getMapKeyObjectInspector();
PrimitiveObjectInspector value = (PrimitiveObjectInspector) mapObjectInspector.getMapValueObjectInspector();
objectMap.forEach((k, v) -> {
String resolvedFieldName = StringUtils.format(mapChildFieldNameFormat, key.getPrimitiveJavaObject(k).toString());
parsedMap.put(resolvedFieldName, value.getPrimitiveJavaObject(v));
});
}
@JsonProperty
public String getMapFieldNameFormat()
{
return mapFieldNameFormat;
}
@Override
@JsonProperty
public ParseSpec getParseSpec()
{
return parseSpec;
}
@JsonProperty
public String getTypeString()
{
return typeString;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new OrcHadoopInputRowParser(parseSpec, typeString, null);
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final OrcHadoopInputRowParser that = (OrcHadoopInputRowParser) o;
return Objects.equals(parseSpec, that.parseSpec) &&
Objects.equals(typeString, that.typeString);
}
@Override
public int hashCode()
{
return Objects.hash(parseSpec, typeString);
}
@Override
public String toString()
{
return "OrcHadoopInputRowParser{" +
"parseSpec=" + parseSpec +
", typeString='" + typeString + '\'' +
'}';
}
@VisibleForTesting
static String typeStringFromParseSpec(ParseSpec parseSpec)
{
StringBuilder builder = new StringBuilder("struct<");
builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string");
// the typeString seems positionally dependent, so repeated timestamp column causes incorrect mapping
if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) {
builder.append(
parseSpec
.getDimensionsSpec()
.getDimensionNames()
.stream()
.filter(s -> !s.equals(parseSpec.getTimestampSpec().getTimestampColumn()))
.collect(Collectors.joining(":string,", ",", ":string"))
);
}
builder.append(">");
return builder.toString();
}
private static Object coercePrimitiveObject(final PrimitiveObjectInspector inspector, final Object object)
{
if (object instanceof HiveDecimalWritable) {
// inspector on HiveDecimal rounds off to integer for some reason.
return ((HiveDecimalWritable) object).getHiveDecimal().doubleValue();
} else if (object instanceof DateWritable) {
return object.toString();
} else {
return inspector.getPrimitiveJavaObject(object);
}
}
private static StructObjectInspector makeObjectInspector(final String typeString)
{
final OrcSerde serde = new OrcSerde();
TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
Preconditions.checkArgument(
typeInfo instanceof StructTypeInfo,
StringUtils.format("typeString should be struct type but not [%s]", typeString)
);
Properties table = getTablePropertiesFromStructTypeInfo((StructTypeInfo) typeInfo);
serde.initialize(new Configuration(), table);
try {
return (StructObjectInspector) serde.getObjectInspector();
}
catch (SerDeException e) {
throw new RuntimeException(e);
}
}
private static Properties getTablePropertiesFromStructTypeInfo(StructTypeInfo structTypeInfo)
{
Properties table = new Properties();
table.setProperty("columns", String.join(",", structTypeInfo.getAllStructFieldNames()));
table.setProperty("columns.types", String.join(
",",
Lists.transform(
structTypeInfo.getAllStructFieldTypeInfos(),
new Function<TypeInfo, String>()
{
@Nullable
@Override
public String apply(@Nullable TypeInfo typeInfo)
{
return typeInfo.getTypeName();
}
}
)
));
return table;
}
}

View File

@ -1,242 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class DruidOrcInputFormatTest
{
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
String timestamp = "2016-01-01T00:00:00.000Z";
String col1 = "bar";
String[] col2 = {"dat1", "dat2", "dat3"};
double val1 = 1.1;
Job job;
HadoopDruidIndexerConfig config;
File testFile;
Path path;
FileSplit split;
@Before
public void setUp() throws IOException
{
Configuration conf = new Configuration();
job = Job.getInstance(conf);
config = HadoopDruidIndexerConfig.fromFile(new File(
"example/hadoop_orc_job.json"));
config.intoConfiguration(job);
testFile = makeOrcFile();
path = new Path(testFile.getAbsoluteFile().toURI());
split = new FileSplit(path, 0, testFile.length(), null);
}
@Test
public void testRead() throws IOException, InterruptedException
{
InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
InputRowParser<OrcStruct> parser = (InputRowParser<OrcStruct>) config.getParser();
reader.initialize(split, context);
reader.nextKeyValue();
OrcStruct data = (OrcStruct) reader.getCurrentValue();
MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
Assert.assertTrue(row.getEvent().keySet().size() == 4);
Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());
Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
Assert.assertEquals(col1, row.getEvent().get("col1"));
Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
reader.close();
}
@Test
public void testReadDateColumn() throws IOException, InterruptedException
{
File testFile2 = makeOrcFileWithDate();
Path path = new Path(testFile2.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile2.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
RecordReader reader = inputFormat.createRecordReader(split, context);
InputRowParser<OrcStruct> parser = (InputRowParser<OrcStruct>) config.getParser();
reader.initialize(split, context);
reader.nextKeyValue();
OrcStruct data = (OrcStruct) reader.getCurrentValue();
MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
Assert.assertTrue(row.getEvent().keySet().size() == 4);
Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());
Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
Assert.assertEquals(col1, row.getEvent().get("col1"));
Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
reader.close();
}
private File makeOrcFile() throws IOException
{
final File dir = temporaryFolder.newFolder();
final File testOrc = new File(dir, "test.orc");
TypeDescription schema = TypeDescription.createStruct()
.addField("timestamp", TypeDescription.createString())
.addField("col1", TypeDescription.createString())
.addField("col2", TypeDescription.createList(TypeDescription.createString()))
.addField("val1", TypeDescription.createFloat());
Configuration conf = new Configuration();
Writer writer = OrcFile.createWriter(
new Path(testOrc.getPath()),
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
.compress(CompressionKind.ZLIB)
.version(OrcFile.Version.CURRENT)
);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1;
((BytesColumnVector) batch.cols[0]).setRef(
0,
StringUtils.toUtf8(timestamp),
0,
timestamp.length()
);
((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length());
ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
listColumnVector.childCount = col2.length;
listColumnVector.lengths[0] = 3;
for (int idx = 0; idx < col2.length; idx++) {
((BytesColumnVector) listColumnVector.child).setRef(
idx,
StringUtils.toUtf8(col2[idx]),
0,
col2[idx].length()
);
}
((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
writer.addRowBatch(batch);
writer.close();
return testOrc;
}
private File makeOrcFileWithDate() throws IOException
{
final File dir = temporaryFolder.newFolder();
final File testOrc = new File(dir, "test-2.orc");
TypeDescription schema = TypeDescription.createStruct()
.addField("timestamp", TypeDescription.createDate())
.addField("col1", TypeDescription.createString())
.addField("col2", TypeDescription.createList(TypeDescription.createString()))
.addField("val1", TypeDescription.createFloat());
Configuration conf = new Configuration();
Writer writer = OrcFile.createWriter(
new Path(testOrc.getPath()),
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
.compress(CompressionKind.ZLIB)
.version(OrcFile.Version.CURRENT)
);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = 1;
DateTime ts = DateTimes.of(timestamp);
// date is stored as long column vector with number of days since epoch
((LongColumnVector) batch.cols[0]).vector[0] =
TimeUnit.MILLISECONDS.toDays(ts.minus(DateTimes.EPOCH.getMillis()).getMillis());
((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length());
ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
listColumnVector.childCount = col2.length;
listColumnVector.lengths[0] = 3;
for (int idx = 0; idx < col2.length; idx++) {
((BytesColumnVector) listColumnVector.child).setRef(
idx,
StringUtils.toUtf8(col2[idx]),
0,
col2[idx].length()
);
}
((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
writer.addRowBatch(batch);
writer.close();
return testOrc;
}
}

View File

@ -1,188 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.name.Names;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
public class OrcHadoopInputRowParserTest
{
Injector injector;
ObjectMapper mapper = new DefaultObjectMapper();
@Before
public void setUp()
{
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
}
},
new OrcExtensionsModule()
)
);
mapper = injector.getInstance(ObjectMapper.class);
}
@Test
public void testSerde() throws IOException
{
String parserString = "{\n" +
" \"type\": \"orc\",\n" +
" \"parseSpec\": {\n" +
" \"format\": \"timeAndDims\",\n" +
" \"timestampSpec\": {\n" +
" \"column\": \"timestamp\",\n" +
" \"format\": \"auto\"\n" +
" },\n" +
" \"dimensionsSpec\": {\n" +
" \"dimensions\": [\n" +
" \"col1\",\n" +
" \"col2\"\n" +
" ],\n" +
" \"dimensionExclusions\": [],\n" +
" \"spatialDimensions\": []\n" +
" }\n" +
" },\n" +
" \"typeString\": \"struct<timestamp:string,col1:string,col2:array<string>,val1:float>\"\n" +
" }";
InputRowParser parser = mapper.readValue(parserString, InputRowParser.class);
InputRowParser expected = new OrcHadoopInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
ImmutableList.of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")),
null,
null
)
),
"struct<timestamp:string,col1:string,col2:array<string>,val1:float>",
null
);
Assert.assertEquals(expected, parser);
}
@Test
public void testTypeFromParseSpec()
{
ParseSpec parseSpec = new TimeAndDimsParseSpec(
new TimestampSpec(
"timestamp",
"auto",
null
),
new DimensionsSpec(
ImmutableList.of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")),
null,
null
)
);
String typeString = OrcHadoopInputRowParser.typeStringFromParseSpec(parseSpec);
String expected = "struct<timestamp:string,col1:string,col2:string>";
Assert.assertEquals(expected, typeString);
}
@Test
public void testParse()
{
final String typeString = "struct<timestamp:string,col1:string,col2:array<string>,col3:float,col4:bigint,col5:decimal,col6:array<string>,col7:map<string,string>>";
final OrcHadoopInputRowParser parser = new OrcHadoopInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null, null, null)
),
typeString,
"<PARENT>-<CHILD>"
);
final SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct.createObjectInspector(
TypeInfoUtils.getTypeInfoFromTypeString(typeString)
);
final OrcStruct struct = (OrcStruct) oi.create();
struct.setNumFields(8);
oi.setStructFieldData(struct, oi.getStructFieldRef("timestamp"), new Text("2000-01-01"));
oi.setStructFieldData(struct, oi.getStructFieldRef("col1"), new Text("foo"));
oi.setStructFieldData(struct, oi.getStructFieldRef("col2"), ImmutableList.of(new Text("foo"), new Text("bar")));
oi.setStructFieldData(struct, oi.getStructFieldRef("col3"), new FloatWritable(1.5f));
oi.setStructFieldData(struct, oi.getStructFieldRef("col4"), new LongWritable(2));
oi.setStructFieldData(
struct,
oi.getStructFieldRef("col5"),
new HiveDecimalWritable(HiveDecimal.create(BigDecimal.valueOf(3.5d)))
);
oi.setStructFieldData(struct, oi.getStructFieldRef("col6"), null);
oi.setStructFieldData(struct, oi.getStructFieldRef("col7"), ImmutableMap.of(new Text("subcol7"), new Text("subval7")));
final InputRow row = parser.parseBatch(struct).get(0);
Assert.assertEquals("timestamp", DateTimes.of("2000-01-01"), row.getTimestamp());
Assert.assertEquals("col1", "foo", row.getRaw("col1"));
Assert.assertEquals("col2", ImmutableList.of("foo", "bar"), row.getRaw("col2"));
Assert.assertEquals("col3", 1.5f, row.getRaw("col3"));
Assert.assertEquals("col4", 2L, row.getRaw("col4"));
Assert.assertEquals("col5", 3.5d, row.getRaw("col5"));
Assert.assertNull("col6", row.getRaw("col6"));
Assert.assertEquals("col7-subcol7", "subval7", row.getRaw("col7-subcol7"));
}
}

View File

@ -1,404 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexer.HadoopyShardSpec;
import org.apache.druid.indexer.IndexGeneratorJob;
import org.apache.druid.indexer.JobHelper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexIndexableAdapter;
import org.apache.druid.segment.RowIterator;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
public class OrcIndexGeneratorJobTest
{
private static final AggregatorFactory[] aggs = {
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
};
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
private ObjectMapper mapper;
private HadoopDruidIndexerConfig config;
private final String dataSourceName = "website";
private final List<String> data = ImmutableList.of(
"2014102200,a.example.com,100",
"2014102200,b.exmaple.com,50",
"2014102200,c.example.com,200",
"2014102200,d.example.com,250",
"2014102200,e.example.com,123",
"2014102200,f.example.com,567",
"2014102200,g.example.com,11",
"2014102200,h.example.com,251",
"2014102200,i.example.com,963",
"2014102200,j.example.com,333",
"2014102212,a.example.com,100",
"2014102212,b.exmaple.com,50",
"2014102212,c.example.com,200",
"2014102212,d.example.com,250",
"2014102212,e.example.com,123",
"2014102212,f.example.com,567",
"2014102212,g.example.com,11",
"2014102212,h.example.com,251",
"2014102212,i.example.com,963",
"2014102212,j.example.com,333"
);
private final Interval interval = Intervals.of("2014-10-22T00:00:00Z/P1D");
private File dataRoot;
private File outputRoot;
private Integer[][][] shardInfoForEachSegment = new Integer[][][]{
{
{0, 4},
{1, 4},
{2, 4},
{3, 4}
}
};
private final InputRowParser inputRowParser = new OrcHadoopInputRowParser(
new TimeAndDimsParseSpec(
new TimestampSpec("timestamp", "yyyyMMddHH", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null)
),
"struct<timestamp:string,host:string,visited_num:int>",
null
);
private File writeDataToLocalOrcFile(File outputDir, List<String> data) throws IOException
{
File outputFile = new File(outputDir, "test.orc");
TypeDescription schema = TypeDescription.createStruct()
.addField("timestamp", TypeDescription.createString())
.addField("host", TypeDescription.createString())
.addField("visited_num", TypeDescription.createInt());
Configuration conf = new Configuration();
Writer writer = OrcFile.createWriter(
new Path(outputFile.getPath()),
OrcFile.writerOptions(conf)
.setSchema(schema)
.stripeSize(100000)
.bufferSize(10000)
.compress(CompressionKind.ZLIB)
.version(OrcFile.Version.CURRENT)
);
VectorizedRowBatch batch = schema.createRowBatch();
batch.size = data.size();
for (int idx = 0; idx < data.size(); idx++) {
String line = data.get(idx);
String[] lineSplit = line.split(",");
((BytesColumnVector) batch.cols[0]).setRef(
idx,
StringUtils.toUtf8(lineSplit[0]),
0,
lineSplit[0].length()
);
((BytesColumnVector) batch.cols[1]).setRef(
idx,
StringUtils.toUtf8(lineSplit[1]),
0,
lineSplit[1].length()
);
((LongColumnVector) batch.cols[2]).vector[idx] = Long.parseLong(lineSplit[2]);
}
writer.addRowBatch(batch);
writer.close();
return outputFile;
}
@Before
public void setUp() throws Exception
{
mapper = HadoopDruidIndexerConfig.JSON_MAPPER;
mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
dataRoot = temporaryFolder.newFolder("data");
outputRoot = temporaryFolder.newFolder("output");
File dataFile = writeDataToLocalOrcFile(dataRoot, data);
HashMap<String, Object> inputSpec = new HashMap<String, Object>();
inputSpec.put("paths", dataFile.getCanonicalPath());
inputSpec.put("type", "static");
inputSpec.put("inputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat");
config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
dataSourceName,
mapper.convertValue(
inputRowParser,
Map.class
),
aggs,
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)),
null,
mapper
),
new HadoopIOConfig(
ImmutableMap.copyOf(inputSpec),
null,
outputRoot.getCanonicalPath()
),
new HadoopTuningConfig(
outputRoot.getCanonicalPath(),
null,
null,
null,
null,
null,
null,
true,
false,
false,
false,
ImmutableMap.of(MRJobConfig.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
false,
true,
null,
true,
null,
false,
false,
null,
null,
null
)
)
);
config.setShardSpecs(
loadShardSpecs(shardInfoForEachSegment)
);
config = HadoopDruidIndexerConfig.fromSpec(config.getSchema());
}
@Test
public void testIndexGeneratorJob() throws IOException
{
verifyJob(new IndexGeneratorJob(config));
}
private void verifyJob(IndexGeneratorJob job) throws IOException
{
Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
IndexGeneratorJob
.getPublishedSegments(config)
.forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>())
.add(segment));
final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>();
int segmentNum = 0;
for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) {
Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
File segmentOutputFolder = new File(
StringUtils.format(
"%s/%s/%s_%s/%s",
config.getSchema().getIOConfig().getSegmentOutputPath(),
config.getSchema().getDataSchema().getDataSource(),
currTime.toString(),
currTime.plusDays(1).toString(),
config.getSchema().getTuningConfig().getVersion()
)
);
Assert.assertTrue(segmentOutputFolder.exists());
Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length);
for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) {
File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum));
Assert.assertTrue(individualSegmentFolder.exists());
File indexZip = new File(individualSegmentFolder, "index.zip");
Assert.assertTrue(indexZip.exists());
intervalToIndexFiles.computeIfAbsent(new Interval(currTime, currTime.plusDays(1)), k -> new ArrayList<>())
.add(indexZip);
}
}
Assert.assertEquals(intervalToSegments.size(), intervalToIndexFiles.size());
segmentNum = 0;
for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
final Interval interval = entry.getKey();
final List<DataSegment> segments = entry.getValue();
final List<File> indexFiles = intervalToIndexFiles.get(interval);
Collections.sort(segments);
indexFiles.sort(Comparator.comparing(File::getAbsolutePath));
Assert.assertNotNull(indexFiles);
Assert.assertEquals(segments.size(), indexFiles.size());
Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
int rowCount = 0;
for (int i = 0; i < segments.size(); i++) {
final DataSegment dataSegment = segments.get(i);
final File indexZip = indexFiles.get(i);
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
Assert.assertEquals(dataSourceName, dataSegment.getDataSource());
Assert.assertEquals(1, dataSegment.getDimensions().size());
String[] dimensions = dataSegment.getDimensions().toArray(new String[0]);
Arrays.sort(dimensions);
Assert.assertEquals("host", dimensions[0]);
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Integer[] hashShardInfo = shardInfo[i];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions());
File dir = Files.createTempDir();
unzip(indexZip, dir);
QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir);
QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
try (RowIterator rowIt = adapter.getRows()) {
while (rowIt.moveToNext()) {
rowCount++;
Assert.assertEquals(2, rowIt.getPointer().getNumMetrics());
}
}
}
Assert.assertEquals(rowCount, data.size());
}
}
private Map<Long, List<HadoopyShardSpec>> loadShardSpecs(
Integer[][][] shardInfoForEachShard
)
{
Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance());
int shardCount = 0;
int segmentNum = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
List<ShardSpec> specs = new ArrayList<>();
for (Integer[] shardInfo : shardInfoForEachShard[segmentNum++]) {
specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER));
}
List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
for (ShardSpec spec : specs) {
actualSpecs.add(new HadoopyShardSpec(spec, shardCount++));
}
shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
}
return shardSpecs;
}
private void unzip(File zip, File outDir)
{
try {
long size = 0L;
final byte[] buffer = new byte[1 << 13];
try (ZipInputStream in = new ZipInputStream(new FileInputStream(zip))) {
for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
final String fileName = entry.getName();
try (final OutputStream out = new BufferedOutputStream(
new FileOutputStream(
outDir.getAbsolutePath()
+ File.separator
+ fileName
), 1 << 13
)) {
for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
if (len == 0) {
continue;
}
size += len;
out.write(buffer, 0, len);
}
out.flush();
}
}
}
}
catch (IOException | RuntimeException exception) {
}
}
}

View File

@ -0,0 +1,85 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "example/orc-file-11-format.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "struct_list_struct_int",
"expr": "$.middle.list[1].int1"
},
{
"type": "path",
"name": "struct_list_struct_intlist",
"expr": "$.middle.list[*].int1"
},
{
"type": "path",
"name": "list_struct_string",
"expr": "$.list[0].string1"
},
{
"type": "path",
"name": "map_struct_int",
"expr": "$.map.chani.int1"
}
]
},
"timestampSpec": {
"column": "ts",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -0,0 +1,64 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "example/orc_split_elim.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": []
},
"timestampSpec": {
"column": "ts",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -0,0 +1,64 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "example/TestOrcFile.testDate1900.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": []
},
"timestampSpec": {
"column": "time",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

View File

@ -0,0 +1,64 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "example/TestOrcFile.testDate2038.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": []
},
"timestampSpec": {
"column": "time",
"format": "millis"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [],
"spatialDimensions": []
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

Binary file not shown.

View File

@ -0,0 +1,54 @@
{
"type": "index_hadoop",
"spec": {
"ioConfig": {
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "example/test_1.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
"connectURI": "jdbc:postgresql://localhost/druid",
"user" : "druid",
"password" : "asdf",
"segmentTable": "druid_segments"
},
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
"format": "orc",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
}
}
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"intervals": ["2015-01-01/2017-01-01"]
}
},
"tuningConfig": {
"type": "hadoop",
"workingPath": "tmp/working_path",
"partitionsSpec": {
"targetPartitionSize": 5000000
},
"jobProperties" : {
"mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
"mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
},
"leaveIntermediate": true
}
}
}

Binary file not shown.

View File

@ -5,8 +5,8 @@
"type": "hadoop",
"inputSpec": {
"type": "static",
"inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
"paths": "wikipedia.gz.orc"
"inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
"paths": "example/test_2.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
@ -18,26 +18,32 @@
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
"dataSource": "wikipedia",
"dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
"format": "timeAndDims",
"format": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "col7-subcol7",
"expr": "$.col7.subcol7"
}
]
},
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"timestamp",
"col1",
"col2"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
},
"typeString": "struct<timestamp:string,col1:string,col2:array<string>,val1:float>"
}
},
"metricsSpec": [],
"granularitySpec": {

View File

@ -0,0 +1,143 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-orc-extensions</artifactId>
<name>druid-orc-extensions</name>
<description>druid-orc-extensions</description>
<parent>
<artifactId>druid</artifactId>
<groupId>org.apache.druid</groupId>
<version>0.15.0-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<properties>
<orc.version>1.5.5</orc.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.compile.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
<version>${orc.version}</version>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -36,7 +36,8 @@ public class OrcExtensionsModule implements DruidModule
return Collections.singletonList(
new SimpleModule("OrcInputRowParserModule")
.registerSubtypes(
new NamedType(OrcHadoopInputRowParser.class, "orc")
new NamedType(OrcHadoopInputRowParser.class, "orc"),
new NamedType(OrcParseSpec.class, "orc")
)
);
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.orc.mapred.OrcStruct;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
{
private final ParseSpec parseSpec;
private final ObjectFlattener<OrcStruct> orcStructFlattener;
private final MapInputRowParser parser;
private final boolean binaryAsString;
@JsonCreator
public OrcHadoopInputRowParser(
@JsonProperty("parseSpec") ParseSpec parseSpec,
@Nullable @JsonProperty("binaryAsString") Boolean binaryAsString
)
{
this.parseSpec = parseSpec;
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
final JSONPathSpec flattenSpec;
if (parseSpec instanceof OrcParseSpec) {
flattenSpec = ((OrcParseSpec) parseSpec).getFlattenSpec();
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}
this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(this.binaryAsString));
this.parser = new MapInputRowParser(parseSpec);
}
@NotNull
@Override
public List<InputRow> parseBatch(OrcStruct input)
{
return parser.parseBatch(orcStructFlattener.flatten(input));
}
@Override
public ParseSpec getParseSpec()
{
return parseSpec;
}
@Override
public InputRowParser withParseSpec(ParseSpec parseSpec)
{
return new OrcHadoopInputRowParser(parseSpec, binaryAsString);
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.NestedDataParseSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
public class OrcParseSpec extends NestedDataParseSpec<JSONPathSpec>
{
@JsonCreator
public OrcParseSpec(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
@JsonProperty("flattenSpec") JSONPathSpec flattenSpec
)
{
super(
timestampSpec,
dimensionsSpec != null ? dimensionsSpec : DimensionsSpec.EMPTY,
flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT
);
}
@Override
public ParseSpec withTimestampSpec(TimestampSpec spec)
{
return new OrcParseSpec(spec, getDimensionsSpec(), getFlattenSpec());
}
@Override
public ParseSpec withDimensionsSpec(DimensionsSpec spec)
{
return new OrcParseSpec(getTimestampSpec(), spec, getFlattenSpec());
}
@Override
public String toString()
{
return "OrcParseSpec{" +
"timestampSpec=" + getTimestampSpec() +
", dimensionsSpec=" + getDimensionsSpec() +
", flattenSpec=" + getFlattenSpec() +
"}";
}
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcList;
import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapred.OrcTimestamp;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class OrcStructConverter
{
@Nonnull
private static List<Object> convertList(TypeDescription fieldDescription, OrcList orcList, boolean binaryAsString)
{
// if primitive list, convert primitives
TypeDescription listType = fieldDescription.getChildren().get(0);
if (listType.getCategory().isPrimitive()) {
return (List<Object>) orcList.stream()
.map(li -> convertPrimitive(listType, (WritableComparable) li, binaryAsString))
.collect(Collectors.toList());
}
return new ArrayList<Object>(orcList);
}
private static Map<Object, Object> convertMap(
TypeDescription fieldDescription,
OrcMap<? extends WritableComparable, ? extends WritableComparable> map,
boolean binaryAsString
)
{
Map<Object, Object> converted = new HashMap<>();
TypeDescription keyDescription = fieldDescription.getChildren().get(0);
TypeDescription valueDescription = fieldDescription.getChildren().get(1);
for (WritableComparable key : map.navigableKeySet()) {
Object newKey = convertPrimitive(keyDescription, key, binaryAsString);
if (valueDescription.getCategory().isPrimitive()) {
converted.put(newKey, convertPrimitive(valueDescription, map.get(key), binaryAsString));
} else {
converted.put(newKey, map.get(key));
}
}
return converted;
}
private static Object convertPrimitive(TypeDescription fieldDescription, WritableComparable field, boolean binaryAsString)
{
/*
ORC TYPE WRITABLE TYPE
binary org.apache.hadoop.io.BytesWritable
bigint org.apache.hadoop.io.LongWritable
boolean org.apache.hadoop.io.BooleanWritable
char org.apache.hadoop.io.Text
date org.apache.hadoop.hive.serde2.io.DateWritable
decimal org.apache.hadoop.hive.serde2.io.HiveDecimalWritable
double org.apache.hadoop.io.DoubleWritable
float org.apache.hadoop.io.FloatWritable
int org.apache.hadoop.io.IntWritable
smallint org.apache.hadoop.io.ShortWritable
string org.apache.hadoop.io.Text
timestamp org.apache.orc.mapred.OrcTimestamp
tinyint org.apache.hadoop.io.ByteWritable
varchar org.apache.hadoop.io.Text
*/
switch (fieldDescription.getCategory()) {
case STRING:
case CHAR:
case VARCHAR:
return ((Text) field).toString();
case BOOLEAN:
return ((BooleanWritable) field).get();
case BYTE:
return ((ByteWritable) field).get();
case SHORT:
return ((ShortWritable) field).get();
case INT:
return ((IntWritable) field).get();
case LONG:
return ((LongWritable) field).get();
case FLOAT:
return ((FloatWritable) field).get();
case DOUBLE:
return ((DoubleWritable) field).get();
case DECIMAL:
return ((HiveDecimalWritable) field).getHiveDecimal().doubleValue();
case TIMESTAMP:
return ((OrcTimestamp) field).getTime();
case DATE:
return DateTimes.utc(((DateWritable) field).get().getTime());
case BINARY:
byte[] bytes = ((BytesWritable) field).getBytes();
if (binaryAsString) {
return StringUtils.fromUtf8(bytes);
} else {
return bytes;
}
default:
return null;
}
}
private boolean binaryAsString;
private Object2IntMap<String> fieldIndexCache;
OrcStructConverter(boolean binaryAsString)
{
this.binaryAsString = binaryAsString;
}
/**
* Convert a orc struct field of the "root" {@link OrcStruct} that represents the "row". This method has a cache of
* field names to field index that is ONLY valid for this {@link OrcStruct}, and should not be used for
* nested {@link OrcStruct} fields of the row. Looks up field index by field name, and delegates to
* {@link OrcStructConverter#convertField(OrcStruct, int)}.
*/
@Nullable
Object convertRootField(OrcStruct struct, String fieldName)
{
// this cache is only valid for the root level, to skip the indexOf on fieldNames to get the fieldIndex.
TypeDescription schema = struct.getSchema();
final List<String> fields = schema.getFieldNames();
if (fieldIndexCache == null) {
fieldIndexCache = new Object2IntOpenHashMap<>(fields.size());
for (int i = 0; i < fields.size(); i++) {
fieldIndexCache.put(fields.get(i), i);
}
}
WritableComparable wc = struct.getFieldValue(fieldName);
int fieldIndex = fieldIndexCache.getOrDefault(fieldName, -1);
return convertField(struct, fieldIndex);
}
/**
* Convert a orc struct field as though it were a map, by fieldIndex. Complex types will be transformed
* into java lists and maps when possible ({@link OrcStructConverter#convertList} and
* {@link OrcStructConverter#convertMap}), and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*
* Note: "Union" types are not currently supported and will be returned as null
*/
@Nullable
Object convertField(OrcStruct struct, int fieldIndex)
{
if (fieldIndex < 0) {
return null;
}
TypeDescription schema = struct.getSchema();
TypeDescription fieldDescription = schema.getChildren().get(fieldIndex);
WritableComparable fieldValue = struct.getFieldValue(fieldIndex);
if (fieldValue == null) {
return null;
}
if (fieldDescription.getCategory().isPrimitive()) {
return convertPrimitive(fieldDescription, fieldValue, binaryAsString);
} else {
// handle complex column types
/*
ORC TYPE WRITABLE TYPE
array org.apache.orc.mapred.OrcList
map org.apache.orc.mapred.OrcMap
struct org.apache.orc.mapred.OrcStruct
uniontype org.apache.orc.mapred.OrcUnion
*/
switch (fieldDescription.getCategory()) {
case LIST:
OrcList orcList = (OrcList) fieldValue;
return convertList(fieldDescription, orcList, binaryAsString);
case MAP:
OrcMap map = (OrcMap) fieldValue;
return convertMap(fieldDescription, map, binaryAsString);
case STRUCT:
OrcStruct structMap = (OrcStruct) fieldValue;
return convertStructToMap(structMap);
case UNION:
// sorry union types :(
default:
return null;
}
}
}
private Map<String, Object> convertStructToMap(OrcStruct map)
{
Map<String, Object> converted = new HashMap<>();
List<String> fieldNames = map.getSchema().getFieldNames();
for (int i = 0; i < fieldNames.size(); i++) {
converted.put(fieldNames.get(i), convertField(map, i));
}
return converted;
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcList;
import org.apache.orc.mapred.OrcMap;
import org.apache.orc.mapred.OrcStruct;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.function.Function;
public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker<OrcStruct>
{
private final Configuration jsonPathConfiguration;
private final OrcStructConverter converter;
OrcStructFlattenerMaker(boolean binaryAsString)
{
this.converter = new OrcStructConverter(binaryAsString);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(new OrcStructJsonProvider(converter))
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
}
@Override
public Iterable<String> discoverRootFields(OrcStruct obj)
{
List<String> fields = obj.getSchema().getFieldNames();
List<TypeDescription> children = obj.getSchema().getChildren();
List<String> primitiveFields = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
if (children.get(i).getCategory().isPrimitive() || (children.get(i).getCategory().equals(TypeDescription.Category.LIST) &&
children.get(i).getChildren().get(0).getCategory().isPrimitive())) {
primitiveFields.add(fields.get(i));
}
}
return primitiveFields;
}
@Override
public Object getRootField(OrcStruct obj, String key)
{
return finalizeConversion(converter.convertRootField(obj, key));
}
@Override
public Function<OrcStruct, Object> makeJsonPathExtractor(String expr)
{
final JsonPath jsonPath = JsonPath.compile(expr);
return record -> {
Object val = jsonPath.read(record, jsonPathConfiguration);
return finalizeConversion(val);
};
}
@Nullable
@Override
public Function<OrcStruct, Object> makeJsonQueryExtractor(String expr)
{
throw new UnsupportedOperationException("ORC flattener does not support JQ");
}
private Object finalizeConversion(Object o)
{
// replace any remaining complex types with null
if (o instanceof OrcStruct || o instanceof OrcMap || o instanceof OrcList) {
return null;
}
return o;
}
}

View File

@ -0,0 +1,190 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.jayway.jsonpath.InvalidJsonException;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.orc.mapred.OrcStruct;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class OrcStructJsonProvider implements JsonProvider
{
private final OrcStructConverter converter;
OrcStructJsonProvider(OrcStructConverter converter)
{
this.converter = converter;
}
@Override
public Object createArray()
{
return new ArrayList<>();
}
@Override
public Object createMap()
{
return new HashMap<>();
}
@Override
public boolean isArray(final Object o)
{
return o instanceof List;
}
@Override
public boolean isMap(final Object o)
{
return o == null || o instanceof Map || o instanceof OrcStruct;
}
@Override
public int length(final Object o)
{
if (o instanceof List) {
return ((List) o).size();
} else {
return 0;
}
}
@Override
public Iterable<?> toIterable(final Object o)
{
if (o instanceof List) {
return (List) o;
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Collection<String> getPropertyKeys(final Object o)
{
if (o == null) {
return Collections.emptySet();
} else if (o instanceof Map) {
return ((Map<Object, Object>) o).keySet().stream().map(String::valueOf).collect(Collectors.toSet());
} else if (o instanceof OrcStruct) {
return ((OrcStruct) o).getSchema().getFieldNames();
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public Object getMapValue(final Object o, final String s)
{
if (o == null) {
return null;
} else if (o instanceof Map) {
return ((Map) o).get(s);
} else if (o instanceof OrcStruct) {
OrcStruct struct = (OrcStruct) o;
// get field by index since we have no way to know if this map is the root or not
return converter.convertField(struct, struct.getSchema().getFieldNames().indexOf(s));
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public Object getArrayIndex(final Object o, final int i)
{
if (o instanceof List) {
return ((List) o).get(i);
}
throw new UnsupportedOperationException(o.getClass().getName());
}
@Override
public void setArrayIndex(final Object o, final int i, final Object o1)
{
if (o instanceof List) {
final List list = (List) o;
if (list.size() == i) {
list.add(o1);
} else {
list.set(i, o1);
}
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void setProperty(final Object o, final Object o1, final Object o2)
{
if (o instanceof Map) {
((Map) o).put(o1, o2);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
public void removeProperty(final Object o, final Object o1)
{
if (o instanceof Map) {
((Map) o).remove(o1);
} else {
throw new UnsupportedOperationException(o.getClass().getName());
}
}
@Override
@Deprecated
public Object getArrayIndex(final Object o, final int i, final boolean b)
{
throw new UnsupportedOperationException("Deprecated");
}
@Override
public Object parse(final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object parse(final InputStream inputStream, final String s) throws InvalidJsonException
{
throw new UnsupportedOperationException("Unused");
}
@Override
public String toJson(final Object o)
{
throw new UnsupportedOperationException("Unused");
}
@Override
public Object unwrap(final Object o)
{
throw new UnsupportedOperationException("Unused");
}
}

View File

@ -0,0 +1,271 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.path.StaticPathSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class OrcHadoopInputRowParserTest
{
@Test
public void testTest1() throws IOException, InterruptedException
{
// total auto-discover fields (no flattenSpec, no dimensionSpec)
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/test_1_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
/*
test_1.orc
struct<timestamp:string,col1:string,col2:array<string>,val1:float>
{2016-01-01T00:00:00.000Z, bar, [dat1, dat2, dat3], 1.1}
*/
OrcStruct data = getFirstRow(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
assertEquals(3, rows.get(0).getDimensions().size());
assertEquals("bar", rows.get(0).getDimension("col1").get(0));
String s1 = rows.get(0).getDimension("col2").get(0);
String s2 = rows.get(0).getDimension("col2").get(1);
String s3 = rows.get(0).getDimension("col2").get(2);
assertEquals("dat1", s1);
assertEquals("dat2", s2);
assertEquals("dat3", s3);
}
@Test
public void testTest2() throws IOException, InterruptedException
{
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/test_2_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
/*
test_2.orc
struct<timestamp:string,col1:string,col2:array<string>,col3:float,col4:bigint,col5:decimal,col6:array<string>,col7:map<string,string>>
{2016-01-01, bar, [dat1, dat2, dat3], 1.1, 2, 3.5, [], {subcol7=subval7}}
*/
OrcStruct data = getFirstRow(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
assertEquals(7, rows.get(0).getDimensions().size());
assertEquals("bar", rows.get(0).getDimension("col1").get(0));
assertEquals("dat1", rows.get(0).getDimension("col2").get(0));
assertEquals("dat2", rows.get(0).getDimension("col2").get(1));
assertEquals("dat3", rows.get(0).getDimension("col2").get(2));
assertEquals(1.1f, rows.get(0).getRaw("col3"));
assertEquals(2L, rows.get(0).getRaw("col4"));
assertEquals(3.5d, rows.get(0).getRaw("col5"));
assertEquals(ImmutableList.of(), rows.get(0).getRaw("col6"));
assertEquals("subval7", rows.get(0).getRaw("col7-subcol7"));
}
@Test
public void testOrcFile11Format() throws IOException, InterruptedException
{
// not sure what file 11 format means, but we'll test it!
/*
orc-file-11-format.orc
struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int,long1:bigint,float1:float,double1:double,bytes1:binary,string1:string,middle:struct<list:array<struct<int1:int,string1:string>>>,list:array<struct<int1:int,string1:string>>,map:map<string,struct<int1:int,string1:string>>,ts:timestamp,decimal1:decimal(38,10)>
{false, 1, 1024, 65536, 9223372036854775807, 1.0, -15.0, 00 01 02 03 04, hi, {[{1, bye}, {2, sigh}]}, [{3, good}, {4, bad}], {}, 2000-03-12 15:00:00.0, 12345678.6547456}
*/
HadoopDruidIndexerConfig config =
loadHadoopDruidIndexerConfig("example/orc-file-11-format-hadoop-job.json");
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
OrcStruct data = getFirstRow(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
assertEquals(14, rows.get(0).getDimensions().size());
assertEquals("false", rows.get(0).getDimension("boolean1").get(0));
assertEquals("1", rows.get(0).getDimension("byte1").get(0));
assertEquals("1024", rows.get(0).getDimension("short1").get(0));
assertEquals("65536", rows.get(0).getDimension("int1").get(0));
assertEquals("9223372036854775807", rows.get(0).getDimension("long1").get(0));
assertEquals("1.0", rows.get(0).getDimension("float1").get(0));
assertEquals("-15.0", rows.get(0).getDimension("double1").get(0));
assertEquals("AAECAwQAAA==", rows.get(0).getDimension("bytes1").get(0));
assertEquals("hi", rows.get(0).getDimension("string1").get(0));
assertEquals("1.23456786547456E7", rows.get(0).getDimension("decimal1").get(0));
assertEquals("2", rows.get(0).getDimension("struct_list_struct_int").get(0));
assertEquals("1", rows.get(0).getDimension("struct_list_struct_intlist").get(0));
assertEquals("2", rows.get(0).getDimension("struct_list_struct_intlist").get(1));
assertEquals("good", rows.get(0).getDimension("list_struct_string").get(0));
assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), rows.get(0).getTimestamp());
// first row has empty 'map' column, so lets read another!
List<InputRow> allRows = getAllRows(config);
InputRow anotherRow = allRows.get(0);
assertEquals(14, rows.get(0).getDimensions().size());
assertEquals("true", anotherRow.getDimension("boolean1").get(0));
assertEquals("100", anotherRow.getDimension("byte1").get(0));
assertEquals("2048", anotherRow.getDimension("short1").get(0));
assertEquals("65536", anotherRow.getDimension("int1").get(0));
assertEquals("9223372036854775807", anotherRow.getDimension("long1").get(0));
assertEquals("2.0", anotherRow.getDimension("float1").get(0));
assertEquals("-5.0", anotherRow.getDimension("double1").get(0));
assertEquals("AAECAwQAAA==", rows.get(0).getDimension("bytes1").get(0));
assertEquals("bye", anotherRow.getDimension("string1").get(0));
assertEquals("1.23456786547457E7", anotherRow.getDimension("decimal1").get(0));
assertEquals("2", anotherRow.getDimension("struct_list_struct_int").get(0));
assertEquals("cat", anotherRow.getDimension("list_struct_string").get(0));
assertEquals("5", anotherRow.getDimension("map_struct_int").get(0));
}
@Test
public void testOrcSplitElim() throws IOException, InterruptedException
{
// not sure what SplitElim means, but we'll test it!
/*
orc_split_elim.orc
struct<userid:bigint,string1:string,subtype:double,decimal1:decimal(38,10),ts:timestamp>
{2, foo, 0.8, 1.2, 1969-12-31 16:00:00.0}
*/
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/orc_split_elim_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
OrcStruct data = getFirstRow(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
assertEquals(4, rows.get(0).getDimensions().size());
assertEquals("2", rows.get(0).getDimension("userid").get(0));
assertEquals("foo", rows.get(0).getDimension("string1").get(0));
assertEquals("0.8", rows.get(0).getDimension("subtype").get(0));
assertEquals("1.2", rows.get(0).getDimension("decimal1").get(0));
assertEquals(DateTimes.of("1969-12-31T16:00:00.0Z"), rows.get(0).getTimestamp());
}
@Test
public void testDate1900() throws IOException, InterruptedException
{
/*
TestOrcFile.testDate1900.orc
struct<time:timestamp,date:date>
{1900-05-05 12:34:56.1, 1900-12-25}
*/
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/testDate1900_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
OrcStruct data = getFirstRow(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
assertEquals(1, rows.get(0).getDimensions().size());
assertEquals("1900-12-25T00:00:00.000Z", rows.get(0).getDimension("date").get(0));
assertEquals(DateTimes.of("1900-05-05T12:34:56.1Z"), rows.get(0).getTimestamp());
}
@Test
public void testDate2038() throws IOException, InterruptedException
{
/*
TestOrcFile.testDate2038.orc
struct<time:timestamp,date:date>
{2038-05-05 12:34:56.1, 2038-12-25}
*/
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/testDate2038_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
OrcStruct data = getFirstRow(job, ((StaticPathSpec) config.getPathSpec()).getPaths());
List<InputRow> rows = (List<InputRow>) config.getParser().parseBatch(data);
assertEquals(1, rows.get(0).getDimensions().size());
assertEquals("2038-12-25T00:00:00.000Z", rows.get(0).getDimension("date").get(0));
assertEquals(DateTimes.of("2038-05-05T12:34:56.1Z"), rows.get(0).getTimestamp());
}
private static HadoopDruidIndexerConfig loadHadoopDruidIndexerConfig(String configPath)
{
return HadoopDruidIndexerConfig.fromFile(new File(configPath));
}
private static OrcStruct getFirstRow(Job job, String orcPath) throws IOException, InterruptedException
{
File testFile = new File(orcPath);
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(
OrcInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
reader.initialize(split, context);
reader.nextKeyValue();
return (OrcStruct) reader.getCurrentValue();
}
}
private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config)
throws IOException, InterruptedException
{
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
File testFile = new File(((StaticPathSpec) config.getPathSpec()).getPaths());
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(
OrcInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
List<InputRow> records = new ArrayList<>();
InputRowParser parser = config.getParser();
reader.initialize(split, context);
while (reader.nextKeyValue()) {
reader.nextKeyValue();
Object data = reader.getCurrentValue();
records.add(((List<InputRow>) parser.parseBatch(data)).get(0));
}
return records;
}
}
}

View File

@ -94,7 +94,6 @@
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.8.3</hadoop.compile.version>
<hive.version>2.0.0</hive.version>
<powermock.version>1.6.6</powermock.version>
<aws.sdk.version>1.11.199</aws.sdk.version>
<caffeine.version>2.5.5</caffeine.version>
@ -141,6 +140,7 @@
<module>extensions-core/kafka-indexing-service</module>
<module>extensions-core/kinesis-indexing-service</module>
<module>extensions-core/mysql-metadata-storage</module>
<module>extensions-core/orc-extensions</module>
<module>extensions-core/parquet-extensions</module>
<module>extensions-core/postgresql-metadata-storage</module>
<module>extensions-core/protobuf-extensions</module>
@ -162,7 +162,6 @@
<module>extensions-contrib/rabbitmq</module>
<module>extensions-contrib/distinctcount</module>
<module>extensions-contrib/statsd-emitter</module>
<module>extensions-contrib/orc-extensions</module>
<module>extensions-contrib/time-min-max</module>
<module>extensions-contrib/virtual-columns</module>
<module>extensions-contrib/thrift-extensions</module>