diff --git a/core/src/main/java/org/apache/druid/data/input/Rows.java b/core/src/main/java/org/apache/druid/data/input/Rows.java
index 8514805ca93..7435272a188 100644
--- a/core/src/main/java/org/apache/druid/data/input/Rows.java
+++ b/core/src/main/java/org/apache/druid/data/input/Rows.java
@@ -66,6 +66,9 @@ public final class Rows
} else if (inputValue instanceof List) {
// guava's toString function fails on null objects, so please do not use it
return ((List>) inputValue).stream().map(String::valueOf).collect(Collectors.toList());
+ } else if (inputValue instanceof byte[]) {
+ // convert byte[] to base64 encoded string
+ return Collections.singletonList(StringUtils.encodeBase64String((byte[]) inputValue));
} else {
return Collections.singletonList(String.valueOf(inputValue));
}
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 7579b46b6d8..5f870817a11 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -170,6 +170,8 @@
-c
org.apache.druid.extensions:mysql-metadata-storage
-c
+ org.apache.druid.extensions:druid-orc-extensions
+ -c
org.apache.druid.extensions:druid-parquet-extensions
-c
org.apache.druid.extensions:postgresql-metadata-storage
@@ -318,8 +320,6 @@
-c
org.apache.druid.extensions.contrib:druid-opentsdb-emitter
-c
- org.apache.druid.extensions.contrib:druid-orc-extensions
- -c
org.apache.druid.extensions.contrib:druid-rabbitmq
-c
org.apache.druid.extensions.contrib:druid-redis-cache
diff --git a/docs/content/development/extensions-contrib/orc.md b/docs/content/development/extensions-contrib/orc.md
deleted file mode 100644
index b5a56bb57fd..00000000000
--- a/docs/content/development/extensions-contrib/orc.md
+++ /dev/null
@@ -1,113 +0,0 @@
----
-layout: doc_page
-title: "ORC"
----
-
-
-
-# ORC
-
-To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`.
-
-This extension enables Druid to ingest and understand the Apache ORC data format offline.
-
-## ORC Hadoop Parser
-
-This is for batch ingestion using the HadoopDruidIndexer. The inputFormat of inputSpec in ioConfig must be set to `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"`.
-
-|Field | Type | Description | Required|
-|----------|-------------|----------------------------------------------------------------------------------------|---------|
-|type | String | This should say `orc` | yes|
-|parseSpec | JSON Object | Specifies the timestamp and dimensions of the data. Any parse spec that extends ParseSpec is possible but only their TimestampSpec and DimensionsSpec are used. | yes|
-|typeString| String | String representation of ORC struct type info. If not specified, auto constructed from parseSpec but all metric columns are dropped | no|
-|mapFieldNameFormat| String | String format for resolving the flatten map fields. Default is `_`. | no |
-
-For example of `typeString`, string column col1 and array of string column col2 is represented by `"struct>"`.
-
-Currently, it only supports java primitive types, array of java primitive types and map of java primitive types. Thus, compound types 'list' and 'map' in [ORC types](https://orc.apache.org/docs/types.html) are supported. Note that, list of list is not supported, nor map of compound types. For map types, values will be exploded to several columns where column names will be resolved via `mapFieldNameFormat`.
-
-For example of hadoop indexing:
-
-```json
-{
- "type": "index_hadoop",
- "spec": {
- "ioConfig": {
- "type": "hadoop",
- "inputSpec": {
- "type": "static",
- "inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
- "paths": "/data/path/in/HDFS/"
- },
- "metadataUpdateSpec": {
- "type": "postgresql",
- "connectURI": "jdbc:postgresql://localhost/druid",
- "user" : "druid",
- "password" : "asdf",
- "segmentTable": "druid_segments"
- },
- "segmentOutputPath": "tmp/segments"
- },
- "dataSchema": {
- "dataSource": "no_metrics",
- "parser": {
- "type": "orc",
- "parseSpec": {
- "format": "timeAndDims",
- "timestampSpec": {
- "column": "time",
- "format": "auto"
- },
- "dimensionsSpec": {
- "dimensions": [
- "name"
- ],
- "dimensionExclusions": [],
- "spatialDimensions": []
- }
- },
- "typeString": "struct",
- "mapFieldNameFormat": "_"
- },
- "metricsSpec": [{
- "type": "count",
- "name": "count"
- }],
- "granularitySpec": {
- "type": "uniform",
- "segmentGranularity": "DAY",
- "queryGranularity": "ALL",
- "intervals": ["2015-12-31/2016-01-02"]
- }
- },
- "tuningConfig": {
- "type": "hadoop",
- "workingPath": "tmp/working_path",
- "partitionsSpec": {
- "targetPartitionSize": 5000000
- },
- "jobProperties" : {},
- "leaveIntermediate": true
- }
- }
-}
-```
-
-Almost all the fields listed above are required, including `inputFormat`, `metadataUpdateSpec`(`type`, `connectURI`, `user`, `password`, `segmentTable`). Set `jobProperties` to make hdfs path timezone unrelated.
diff --git a/docs/content/development/extensions-core/orc.md b/docs/content/development/extensions-core/orc.md
new file mode 100644
index 00000000000..9490ec539ff
--- /dev/null
+++ b/docs/content/development/extensions-core/orc.md
@@ -0,0 +1,311 @@
+---
+layout: doc_page
+title: "Druid ORC Extension"
+---
+
+
+
+# Druid ORC Extension
+
+This module extends [Druid Hadoop based indexing](../../ingestion/hadoop.html) to ingest data directly from offline
+Apache ORC files.
+
+To use this extension, make sure to [include](../../operations/including-extensions.html) `druid-orc-extensions`.
+
+## ORC Hadoop Parser
+
+The `inputFormat` of `inputSpec` in `ioConfig` must be set to `"org.apache.orc.mapreduce.OrcInputFormat"`.
+
+
+|Field | Type | Description | Required|
+|----------|-------------|----------------------------------------------------------------------------------------|---------|
+|type | String | This should say `orc` | yes|
+|parseSpec | JSON Object | Specifies the timestamp and dimensions of the data (`timeAndDims` and `orc` format) and a `flattenSpec` (`orc` format) | yes|
+
+The parser supports two `parseSpec` formats: `orc` and `timeAndDims`.
+
+`orc` supports auto field discovery and flattening, if specified with a [flattenSpec](../../ingestion/flatten-json.html).
+If no `flattenSpec` is specified, `useFieldDiscovery` will be enabled by default. Specifying a `dimensionSpec` is
+optional if `useFieldDiscovery` is enabled: if a `dimensionSpec` is supplied, the list of `dimensions` it defines will be
+the set of ingested dimensions, if missing the discovered fields will make up the list.
+
+`timeAndDims` parse spec must specify which fields will be extracted as dimensions through the `dimensionSpec`.
+
+[All column types](https://orc.apache.org/docs/types.html) are supported, with the exception of `union` types. Columns of
+ `list` type, if filled with primitives, may be used as a multi-value dimension, or specific elements can be extracted with
+`flattenSpec` expressions. Likewise, primitive fields may be extracted from `map` and `struct` types in the same manner.
+Auto field discovery will automatically create a string dimension for every (non-timestamp) primitive or `list` of
+primitives, as well as any flatten expressions defined in the `flattenSpec`.
+
+### Hadoop Job Properties
+Like most Hadoop jobs, the best outcomes will add `"mapreduce.job.user.classpath.first": "true"` or
+`"mapreduce.job.classloader": "true"` to the `jobProperties` section of `tuningConfig`. Note that it is likely if using
+`"mapreduce.job.classloader": "true"` that you will need to set `mapreduce.job.classloader.system.classes` to include
+`-org.apache.hadoop.hive.` to instruct Hadoop to load `org.apache.hadoop.hive` classes from the application jars instead
+of system jars, e.g.
+
+```json
+...
+ "mapreduce.job.classloader": "true",
+ "mapreduce.job.classloader.system.classes" : "java., javax.accessibility., javax.activation., javax.activity., javax.annotation., javax.annotation.processing., javax.crypto., javax.imageio., javax.jws., javax.lang.model., -javax.management.j2ee., javax.management., javax.naming., javax.net., javax.print., javax.rmi., javax.script., -javax.security.auth.message., javax.security.auth., javax.security.cert., javax.security.sasl., javax.sound., javax.sql., javax.swing., javax.tools., javax.transaction., -javax.xml.registry., -javax.xml.rpc., javax.xml., org.w3c.dom., org.xml.sax., org.apache.commons.logging., org.apache.log4j., -org.apache.hadoop.hbase., -org.apache.hadoop.hive., org.apache.hadoop., core-default.xml, hdfs-default.xml, mapred-default.xml, yarn-default.xml",
+...
+```
+
+This is due to the `hive-storage-api` dependency of the
+`orc-mapreduce` library, which provides some classes under the `org.apache.hadoop.hive` package. If instead using the
+setting `"mapreduce.job.user.classpath.first": "true"`, then this will not be an issue.
+
+### Examples
+
+#### `orc` parser, `orc` parseSpec, auto field discovery, flatten expressions
+
+```json
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "path/to/file.orc"
+ },
+ ...
+ },
+ "dataSchema": {
+ "dataSource": "example",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": [
+ {
+ "type": "path",
+ "name": "nestedDim",
+ "expr": "$.nestedData.dim1"
+ },
+ {
+ "type": "path",
+ "name": "listDimFirstItem",
+ "expr": "$.listDim[1]"
+ }
+ ]
+ },
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "millis"
+ }
+ }
+ },
+ ...
+ },
+ "tuningConfig":
+ }
+ }
+}
+```
+
+#### `orc` parser, `orc` parseSpec, field discovery with no flattenSpec or dimensionSpec
+
+```json
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "path/to/file.orc"
+ },
+ ...
+ },
+ "dataSchema": {
+ "dataSource": "example",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "millis"
+ }
+ }
+ },
+ ...
+ },
+ "tuningConfig":
+ }
+ }
+}
+```
+
+#### `orc` parser, `orc` parseSpec, no autodiscovery
+
+```json
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "path/to/file.orc"
+ },
+ ...
+ },
+ "dataSchema": {
+ "dataSource": "example",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": false,
+ "fields": [
+ {
+ "type": "path",
+ "name": "nestedDim",
+ "expr": "$.nestedData.dim1"
+ },
+ {
+ "type": "path",
+ "name": "listDimFirstItem",
+ "expr": "$.listDim[1]"
+ }
+ ]
+ },
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "millis"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "dim1",
+ "dim3",
+ "nestedDim",
+ "listDimFirstItem"
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ ...
+ },
+ "tuningConfig":
+ }
+ }
+}
+```
+
+#### `orc` parser, `timeAndDims` parseSpec
+```json
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "path/to/file.orc"
+ },
+ ...
+ },
+ "dataSchema": {
+ "dataSource": "example",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "timeAndDims",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ "dim1",
+ "dim2",
+ "dim3",
+ "listDim"
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ ...
+ },
+ "tuningConfig":
+ }
+}
+
+```
+
+### Migration from 'contrib' extension
+This extension, first available in version 0.15.0, replaces the previous 'contrib' extension which was available until
+0.14.0-incubating. While this extension can index any data the 'contrib' extension could, the json spec for the
+ingestion task is *incompatible*, and will need modified to work with the newer 'core' extension.
+
+To migrate to 0.15.0+:
+* In `inputSpec` of `ioConfig`, `inputFormat` must be changed from `"org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat"` to
+`"org.apache.orc.mapreduce.OrcInputFormat"`
+* The 'contrib' extension supported a `typeString` property, which provided the schema of the
+ORC file, of which was essentially required to have the types correct, but notably _not_ the column names, which
+facilitated column renaming. In the 'core' extension, column renaming can be achieved with
+[`flattenSpec` expressions](../../ingestion/flatten-json.html). For example, `"typeString":"struct"`
+with the actual schema `struct<_col0:string,_col1:string>`, to preserve Druid schema would need replaced with:
+```json
+"flattenSpec": {
+ "fields": [
+ {
+ "type": "path",
+ "name": "time",
+ "expr": "$._col0"
+ },
+ {
+ "type": "path",
+ "name": "name",
+ "expr": "$._col1"
+ }
+ ]
+ ...
+}
+```
+* The 'contrib' extension supported a `mapFieldNameFormat` property, which provided a way to specify a dimension to
+ flatten `OrcMap` columns with primitive types. This functionality has also been replaced with
+ [`flattenSpec` expressions](../../ingestion/flatten-json.html). For example: `"mapFieldNameFormat": "_"`
+ for a dimension `nestedData_dim1`, to preserve Druid schema could be replaced with
+ ```json
+"flattenSpec": {
+ "fields": [
+ {
+ "type": "path",
+ "name": "nestedData_dim1",
+ "expr": "$.nestedData.dim1"
+ }
+ ]
+ ...
+}
+```
\ No newline at end of file
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 1775edbd725..eca422eaa48 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -55,6 +55,7 @@ Core extensions are maintained by Druid committers.
|druid-kerberos|Kerberos authentication for druid processes.|[link](../development/extensions-core/druid-kerberos.html)|
|druid-lookups-cached-global|A module for [lookups](../querying/lookups.html) providing a jvm-global eager caching for lookups. It provides JDBC and URI implementations for fetching lookup data.|[link](../development/extensions-core/lookups-cached-global.html)|
|druid-lookups-cached-single| Per lookup caching module to support the use cases where a lookup need to be isolated from the global pool of lookups |[link](../development/extensions-core/druid-lookups.html)|
+|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-core/orc.html)|
|druid-parquet-extensions|Support for data in Apache Parquet data format. Requires druid-avro-extensions to be loaded.|[link](../development/extensions-core/parquet.html)|
|druid-protobuf-extensions| Support for data in Protobuf data format.|[link](../development/extensions-core/protobuf.html)|
|druid-s3-extensions|Interfacing with data in AWS S3, and using S3 as deep storage.|[link](../development/extensions-core/s3.html)|
@@ -83,7 +84,6 @@ All of these community extensions can be downloaded using *pull-deps* with the c
|druid-cloudfiles-extensions|Rackspace Cloudfiles deep storage and firehose.|[link](../development/extensions-contrib/cloudfiles.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|druid-kafka-eight-simpleConsumer|Kafka ingest firehose (low level consumer)(deprecated).|[link](../development/extensions-contrib/kafka-simple.html)|
-|druid-orc-extensions|Support for data in Apache Orc data format.|[link](../development/extensions-contrib/orc.html)|
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-redis-cache|A cache implementation for Druid based on Redis.|[link](../development/extensions-contrib/redis-cache.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
diff --git a/extensions-contrib/orc-extensions/pom.xml b/extensions-contrib/orc-extensions/pom.xml
deleted file mode 100644
index 944f061de7f..00000000000
--- a/extensions-contrib/orc-extensions/pom.xml
+++ /dev/null
@@ -1,318 +0,0 @@
-
-
-
- org.apache.druid.extensions.contrib
- druid-orc-extensions
- druid-orc-extensions
- druid-orc-extensions
-
-
- druid
- org.apache.druid
- 0.15.0-incubating-SNAPSHOT
- ../../pom.xml
-
- 4.0.0
-
-
-
- org.apache.druid
- druid-indexing-hadoop
- ${project.parent.version}
- provided
-
-
- org.apache.hadoop
- hadoop-client
- ${hadoop.compile.version}
- provided
-
-
- org.apache.hive
- hive-exec
- ${hive.version}
-
-
- commons-httpclient
- commons-httpclient
-
-
- commons-logging
- commons-logging
-
-
- org.apache.hive
- hive-ant
-
-
- org.apache.hive
- hive-common
-
-
- org.apache.hive
- hive-vector-code-gen
-
-
- org.apache.hive
- hive-metastore
-
-
- org.apache.hive
- hive-service-rpc
-
-
- org.apache.hive
- hive-llap-client
-
-
- org.apache.hive
- hive-llap-tez
-
-
- org.apache.hive
- hive-shims
-
-
- org.apache.hive
- hive-spark-client
-
-
- com.esotericsoftware
- kryo-shaded
-
-
- com.google.protobuf
- protobuf-java
-
-
- org.apache.parquet
- parquet-hadoop-bundle
-
-
- commons-codec
- commons-codec
-
-
- commons-io
- commons-io
-
-
- org.apache.commons
- commons-lang3
-
-
- commons-lang
- commons-lang
-
-
- javolution
- javolution
-
-
- org.apache.logging.log4j
- log4j-1.2-api
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
-
-
- org.antlr
- antlr-runtime
-
-
- org.antlr
- ST4
-
-
- org.apache.avro
- avro
-
-
- org.apache.avro
- avro-mapred
-
-
- org.apache.ant
- ant
-
-
- org.apache.commons
- commons-compress
-
-
- org.apache.thrift
- libfb303
-
-
- org.apache.hadoop
- hadoop-common
-
-
- org.apache.hadoop
- hadoop-archives
-
-
- org.apache.hadoop
- hadoop-yarn-registry
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-core
-
-
- org.apache.hadoop
- hadoop-mapreduce-client-common
-
-
- org.apache.hadoop
- hadoop-hdfs
-
-
- org.apache.hadoop
- hadoop-yarn-api
-
-
- org.apache.hadoop
- hadoop-yarn-common
-
-
- org.apache.hadoop
- hadoop-yarn-client
-
-
- org.apache.orc
- orc-tools
-
-
- org.apache.ivy
- ivy
-
-
- org.apache.thrift
- libthrift
-
-
- org.apache.zookeeper
- zookeeper
-
-
- org.apache.curator
- curator-framework
-
-
- org.apache.curator
- apache-curator
-
-
- org.codehaus.groovy
- groovy-all
-
-
- org.jodd
- jodd-core
-
-
- com.fasterxml.jackson.core
- jackson-annotations
-
-
- com.fasterxml.jackson.core
- jackson-core
-
-
- com.fasterxml.jackson.core
- jackson-databind
-
-
- org.datanucleus
- datanucleus-core
-
-
- org.apache.calcite
- calcite-core
-
-
- org.apache.calcite
- calcite-druid
-
-
- org.apache.calcite
- calcite-avatica
-
-
- org.apache.calcite.avatica
- avatica
-
-
- com.google.guava
- guava
-
-
- com.googlecode.javaewah
- JavaEWAH
-
-
- com.google.code.gson
- gson
-
-
- com.tdunning
- json
-
-
- stax
- stax-api
-
-
- net.sf.opencsv
- opencsv
-
-
- org.apache.hive
- hive-standalone-metastore-server
-
-
- org.slf4j
- slf4j-api
-
-
- oro
- oro
-
-
- org.apache.velocity
- velocity
-
-
- jline
- jline
-
-
-
-
- junit
- junit
- test
-
-
-
diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
deleted file mode 100644
index 76190806a44..00000000000
--- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input.orc;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.data.input.impl.ParseSpec;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.io.DateWritable;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.joda.time.DateTime;
-
-import javax.annotation.Nullable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-public class OrcHadoopInputRowParser implements InputRowParser
-{
-
- static final String MAP_CHILD_TAG = "";
- static final String MAP_PARENT_TAG = "";
- static final String DEFAULT_MAP_FIELD_NAME_FORMAT = MAP_PARENT_TAG + "_" + MAP_CHILD_TAG;
-
-
- private final ParseSpec parseSpec;
- private final String typeString;
- private final String mapFieldNameFormat;
- private final String mapParentFieldNameFormat;
- private final List dimensions;
- private final StructObjectInspector oip;
-
-
-
- @JsonCreator
- public OrcHadoopInputRowParser(
- @JsonProperty("parseSpec") ParseSpec parseSpec,
- @JsonProperty("typeString") String typeString,
- @JsonProperty("mapFieldNameFormat") String mapFieldNameFormat
- )
- {
- this.parseSpec = parseSpec;
- this.typeString = typeString == null ? typeStringFromParseSpec(parseSpec) : typeString;
- this.mapFieldNameFormat =
- mapFieldNameFormat == null ||
- !mapFieldNameFormat.contains(MAP_PARENT_TAG) ||
- !mapFieldNameFormat.contains(MAP_CHILD_TAG) ? DEFAULT_MAP_FIELD_NAME_FORMAT : mapFieldNameFormat;
- this.mapParentFieldNameFormat = StringUtils.replace(this.mapFieldNameFormat, MAP_PARENT_TAG, "%s");
- this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
- this.oip = makeObjectInspector(this.typeString);
- }
-
- @SuppressWarnings("ArgumentParameterSwap")
- @Override
- public List parseBatch(OrcStruct input)
- {
- Map map = new HashMap<>();
- List extends StructField> fields = oip.getAllStructFieldRefs();
- for (StructField field : fields) {
- ObjectInspector objectInspector = field.getFieldObjectInspector();
- switch (objectInspector.getCategory()) {
- case PRIMITIVE:
- PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector) objectInspector;
- map.put(
- field.getFieldName(),
- coercePrimitiveObject(
- primitiveObjectInspector,
- oip.getStructFieldData(input, field)
- )
- );
- break;
- case LIST: // array case - only 1-depth array supported yet
- ListObjectInspector listObjectInspector = (ListObjectInspector) objectInspector;
- map.put(
- field.getFieldName(),
- getListObject(listObjectInspector, oip.getStructFieldData(input, field))
- );
- break;
- case MAP:
- MapObjectInspector mapObjectInspector = (MapObjectInspector) objectInspector;
- getMapObject(field.getFieldName(), mapObjectInspector, oip.getStructFieldData(input, field), map);
- break;
- default:
- break;
- }
- }
-
- TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
- DateTime dateTime = timestampSpec.extractTimestamp(map);
-
- final List dimensions;
- if (!this.dimensions.isEmpty()) {
- dimensions = this.dimensions;
- } else {
- dimensions = Lists.newArrayList(
- Sets.difference(map.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
- );
- }
- return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map));
- }
-
- private List getListObject(ListObjectInspector listObjectInspector, Object listObject)
- {
- if (listObjectInspector.getListLength(listObject) < 0) {
- return null;
- }
- List> objectList = listObjectInspector.getList(listObject);
- List> list = null;
- ObjectInspector child = listObjectInspector.getListElementObjectInspector();
- switch (child.getCategory()) {
- case PRIMITIVE:
- final PrimitiveObjectInspector primitiveObjectInspector = (PrimitiveObjectInspector) child;
- list = objectList.stream()
- .map(input -> coercePrimitiveObject(primitiveObjectInspector, input))
- .collect(Collectors.toList());
- break;
- default:
- break;
- }
-
- return list;
- }
-
- private void getMapObject(String parentName, MapObjectInspector mapObjectInspector, Object mapObject, Map parsedMap)
- {
- if (mapObjectInspector.getMapSize(mapObject) < 0) {
- return;
- }
- String mapChildFieldNameFormat = StringUtils.replace(
- StringUtils.format(mapParentFieldNameFormat, parentName),
- MAP_CHILD_TAG,
- "%s"
- );
-
- Map objectMap = mapObjectInspector.getMap(mapObject);
- PrimitiveObjectInspector key = (PrimitiveObjectInspector) mapObjectInspector.getMapKeyObjectInspector();
- PrimitiveObjectInspector value = (PrimitiveObjectInspector) mapObjectInspector.getMapValueObjectInspector();
-
- objectMap.forEach((k, v) -> {
- String resolvedFieldName = StringUtils.format(mapChildFieldNameFormat, key.getPrimitiveJavaObject(k).toString());
- parsedMap.put(resolvedFieldName, value.getPrimitiveJavaObject(v));
- });
- }
-
- @JsonProperty
- public String getMapFieldNameFormat()
- {
- return mapFieldNameFormat;
- }
-
- @Override
- @JsonProperty
- public ParseSpec getParseSpec()
- {
- return parseSpec;
- }
-
- @JsonProperty
- public String getTypeString()
- {
- return typeString;
- }
-
- @Override
- public InputRowParser withParseSpec(ParseSpec parseSpec)
- {
- return new OrcHadoopInputRowParser(parseSpec, typeString, null);
- }
-
- @Override
- public boolean equals(final Object o)
- {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- final OrcHadoopInputRowParser that = (OrcHadoopInputRowParser) o;
- return Objects.equals(parseSpec, that.parseSpec) &&
- Objects.equals(typeString, that.typeString);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(parseSpec, typeString);
- }
-
- @Override
- public String toString()
- {
- return "OrcHadoopInputRowParser{" +
- "parseSpec=" + parseSpec +
- ", typeString='" + typeString + '\'' +
- '}';
- }
-
- @VisibleForTesting
- static String typeStringFromParseSpec(ParseSpec parseSpec)
- {
- StringBuilder builder = new StringBuilder("struct<");
- builder.append(parseSpec.getTimestampSpec().getTimestampColumn()).append(":string");
- // the typeString seems positionally dependent, so repeated timestamp column causes incorrect mapping
- if (parseSpec.getDimensionsSpec().getDimensionNames().size() > 0) {
- builder.append(
- parseSpec
- .getDimensionsSpec()
- .getDimensionNames()
- .stream()
- .filter(s -> !s.equals(parseSpec.getTimestampSpec().getTimestampColumn()))
- .collect(Collectors.joining(":string,", ",", ":string"))
- );
- }
- builder.append(">");
-
- return builder.toString();
- }
-
- private static Object coercePrimitiveObject(final PrimitiveObjectInspector inspector, final Object object)
- {
- if (object instanceof HiveDecimalWritable) {
- // inspector on HiveDecimal rounds off to integer for some reason.
- return ((HiveDecimalWritable) object).getHiveDecimal().doubleValue();
- } else if (object instanceof DateWritable) {
- return object.toString();
- } else {
- return inspector.getPrimitiveJavaObject(object);
- }
- }
-
- private static StructObjectInspector makeObjectInspector(final String typeString)
- {
- final OrcSerde serde = new OrcSerde();
-
- TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeString);
- Preconditions.checkArgument(
- typeInfo instanceof StructTypeInfo,
- StringUtils.format("typeString should be struct type but not [%s]", typeString)
- );
- Properties table = getTablePropertiesFromStructTypeInfo((StructTypeInfo) typeInfo);
- serde.initialize(new Configuration(), table);
- try {
- return (StructObjectInspector) serde.getObjectInspector();
- }
- catch (SerDeException e) {
- throw new RuntimeException(e);
- }
- }
-
- private static Properties getTablePropertiesFromStructTypeInfo(StructTypeInfo structTypeInfo)
- {
- Properties table = new Properties();
- table.setProperty("columns", String.join(",", structTypeInfo.getAllStructFieldNames()));
- table.setProperty("columns.types", String.join(
- ",",
- Lists.transform(
- structTypeInfo.getAllStructFieldTypeInfos(),
- new Function()
- {
- @Nullable
- @Override
- public String apply(@Nullable TypeInfo typeInfo)
- {
- return typeInfo.getTypeName();
- }
- }
- )
- ));
-
- return table;
- }
-}
diff --git a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java
deleted file mode 100644
index 57462d2ef20..00000000000
--- a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/DruidOrcInputFormatTest.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input.orc;
-
-import org.apache.druid.data.input.MapBasedInputRow;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-import org.joda.time.DateTime;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-public class DruidOrcInputFormatTest
-{
- @Rule
- public final TemporaryFolder temporaryFolder = new TemporaryFolder();
- String timestamp = "2016-01-01T00:00:00.000Z";
- String col1 = "bar";
- String[] col2 = {"dat1", "dat2", "dat3"};
- double val1 = 1.1;
- Job job;
- HadoopDruidIndexerConfig config;
- File testFile;
- Path path;
- FileSplit split;
-
- @Before
- public void setUp() throws IOException
- {
- Configuration conf = new Configuration();
- job = Job.getInstance(conf);
-
- config = HadoopDruidIndexerConfig.fromFile(new File(
- "example/hadoop_orc_job.json"));
-
- config.intoConfiguration(job);
-
- testFile = makeOrcFile();
- path = new Path(testFile.getAbsoluteFile().toURI());
- split = new FileSplit(path, 0, testFile.length(), null);
-
- }
-
- @Test
- public void testRead() throws IOException, InterruptedException
- {
- InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
-
- TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader reader = inputFormat.createRecordReader(split, context);
- InputRowParser parser = (InputRowParser) config.getParser();
-
- reader.initialize(split, context);
-
- reader.nextKeyValue();
-
- OrcStruct data = (OrcStruct) reader.getCurrentValue();
-
- MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
-
- Assert.assertTrue(row.getEvent().keySet().size() == 4);
- Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());
- Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
- Assert.assertEquals(col1, row.getEvent().get("col1"));
- Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
-
- reader.close();
- }
-
- @Test
- public void testReadDateColumn() throws IOException, InterruptedException
- {
- File testFile2 = makeOrcFileWithDate();
- Path path = new Path(testFile2.getAbsoluteFile().toURI());
- FileSplit split = new FileSplit(path, 0, testFile2.length(), null);
-
- InputFormat inputFormat = ReflectionUtils.newInstance(OrcNewInputFormat.class, job.getConfiguration());
-
- TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
- RecordReader reader = inputFormat.createRecordReader(split, context);
- InputRowParser parser = (InputRowParser) config.getParser();
-
- reader.initialize(split, context);
-
- reader.nextKeyValue();
-
- OrcStruct data = (OrcStruct) reader.getCurrentValue();
-
- MapBasedInputRow row = (MapBasedInputRow) parser.parseBatch(data).get(0);
-
- Assert.assertTrue(row.getEvent().keySet().size() == 4);
- Assert.assertEquals(DateTimes.of(timestamp), row.getTimestamp());
- Assert.assertEquals(parser.getParseSpec().getDimensionsSpec().getDimensionNames(), row.getDimensions());
- Assert.assertEquals(col1, row.getEvent().get("col1"));
- Assert.assertEquals(Arrays.asList(col2), row.getDimension("col2"));
-
- reader.close();
- }
-
- private File makeOrcFile() throws IOException
- {
- final File dir = temporaryFolder.newFolder();
- final File testOrc = new File(dir, "test.orc");
- TypeDescription schema = TypeDescription.createStruct()
- .addField("timestamp", TypeDescription.createString())
- .addField("col1", TypeDescription.createString())
- .addField("col2", TypeDescription.createList(TypeDescription.createString()))
- .addField("val1", TypeDescription.createFloat());
- Configuration conf = new Configuration();
- Writer writer = OrcFile.createWriter(
- new Path(testOrc.getPath()),
- OrcFile.writerOptions(conf)
- .setSchema(schema)
- .stripeSize(100000)
- .bufferSize(10000)
- .compress(CompressionKind.ZLIB)
- .version(OrcFile.Version.CURRENT)
- );
- VectorizedRowBatch batch = schema.createRowBatch();
- batch.size = 1;
- ((BytesColumnVector) batch.cols[0]).setRef(
- 0,
- StringUtils.toUtf8(timestamp),
- 0,
- timestamp.length()
- );
- ((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length());
-
- ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
- listColumnVector.childCount = col2.length;
- listColumnVector.lengths[0] = 3;
- for (int idx = 0; idx < col2.length; idx++) {
- ((BytesColumnVector) listColumnVector.child).setRef(
- idx,
- StringUtils.toUtf8(col2[idx]),
- 0,
- col2[idx].length()
- );
- }
-
- ((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
- writer.addRowBatch(batch);
- writer.close();
-
- return testOrc;
- }
-
- private File makeOrcFileWithDate() throws IOException
- {
- final File dir = temporaryFolder.newFolder();
- final File testOrc = new File(dir, "test-2.orc");
- TypeDescription schema = TypeDescription.createStruct()
- .addField("timestamp", TypeDescription.createDate())
- .addField("col1", TypeDescription.createString())
- .addField("col2", TypeDescription.createList(TypeDescription.createString()))
- .addField("val1", TypeDescription.createFloat());
- Configuration conf = new Configuration();
- Writer writer = OrcFile.createWriter(
- new Path(testOrc.getPath()),
- OrcFile.writerOptions(conf)
- .setSchema(schema)
- .stripeSize(100000)
- .bufferSize(10000)
- .compress(CompressionKind.ZLIB)
- .version(OrcFile.Version.CURRENT)
- );
- VectorizedRowBatch batch = schema.createRowBatch();
- batch.size = 1;
- DateTime ts = DateTimes.of(timestamp);
-
- // date is stored as long column vector with number of days since epoch
- ((LongColumnVector) batch.cols[0]).vector[0] =
- TimeUnit.MILLISECONDS.toDays(ts.minus(DateTimes.EPOCH.getMillis()).getMillis());
-
- ((BytesColumnVector) batch.cols[1]).setRef(0, StringUtils.toUtf8(col1), 0, col1.length());
-
- ListColumnVector listColumnVector = (ListColumnVector) batch.cols[2];
- listColumnVector.childCount = col2.length;
- listColumnVector.lengths[0] = 3;
- for (int idx = 0; idx < col2.length; idx++) {
- ((BytesColumnVector) listColumnVector.child).setRef(
- idx,
- StringUtils.toUtf8(col2[idx]),
- 0,
- col2[idx].length()
- );
- }
-
- ((DoubleColumnVector) batch.cols[3]).vector[0] = val1;
- writer.addRowBatch(batch);
- writer.close();
-
- return testOrc;
- }
-}
diff --git a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
deleted file mode 100644
index 3b7f56928bb..00000000000
--- a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParserTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input.orc;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.inject.Binder;
-import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.google.inject.name.Names;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.data.input.impl.ParseSpec;
-import org.apache.druid.data.input.impl.StringDimensionSchema;
-import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.guice.GuiceInjectors;
-import org.apache.druid.initialization.Initialization;
-import org.apache.druid.jackson.DefaultObjectMapper;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-
-public class OrcHadoopInputRowParserTest
-{
- Injector injector;
- ObjectMapper mapper = new DefaultObjectMapper();
-
- @Before
- public void setUp()
- {
- injector = Initialization.makeInjectorWithModules(
- GuiceInjectors.makeStartupInjector(),
- ImmutableList.of(
- new Module()
- {
- @Override
- public void configure(Binder binder)
- {
- binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
- binder.bindConstant().annotatedWith(Names.named("servicePort")).to(0);
- binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(-1);
- }
- },
- new OrcExtensionsModule()
- )
- );
- mapper = injector.getInstance(ObjectMapper.class);
- }
-
- @Test
- public void testSerde() throws IOException
- {
- String parserString = "{\n" +
- " \"type\": \"orc\",\n" +
- " \"parseSpec\": {\n" +
- " \"format\": \"timeAndDims\",\n" +
- " \"timestampSpec\": {\n" +
- " \"column\": \"timestamp\",\n" +
- " \"format\": \"auto\"\n" +
- " },\n" +
- " \"dimensionsSpec\": {\n" +
- " \"dimensions\": [\n" +
- " \"col1\",\n" +
- " \"col2\"\n" +
- " ],\n" +
- " \"dimensionExclusions\": [],\n" +
- " \"spatialDimensions\": []\n" +
- " }\n" +
- " },\n" +
- " \"typeString\": \"struct,val1:float>\"\n" +
- " }";
-
- InputRowParser parser = mapper.readValue(parserString, InputRowParser.class);
- InputRowParser expected = new OrcHadoopInputRowParser(
- new TimeAndDimsParseSpec(
- new TimestampSpec(
- "timestamp",
- "auto",
- null
- ),
- new DimensionsSpec(
- ImmutableList.of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")),
- null,
- null
- )
- ),
- "struct,val1:float>",
- null
- );
-
- Assert.assertEquals(expected, parser);
- }
-
- @Test
- public void testTypeFromParseSpec()
- {
- ParseSpec parseSpec = new TimeAndDimsParseSpec(
- new TimestampSpec(
- "timestamp",
- "auto",
- null
- ),
- new DimensionsSpec(
- ImmutableList.of(new StringDimensionSchema("col1"), new StringDimensionSchema("col2")),
- null,
- null
- )
- );
- String typeString = OrcHadoopInputRowParser.typeStringFromParseSpec(parseSpec);
- String expected = "struct";
-
- Assert.assertEquals(expected, typeString);
- }
-
- @Test
- public void testParse()
- {
- final String typeString = "struct,col3:float,col4:bigint,col5:decimal,col6:array,col7:map>";
- final OrcHadoopInputRowParser parser = new OrcHadoopInputRowParser(
- new TimeAndDimsParseSpec(
- new TimestampSpec("timestamp", "auto", null),
- new DimensionsSpec(null, null, null)
- ),
- typeString,
- "-"
- );
-
- final SettableStructObjectInspector oi = (SettableStructObjectInspector) OrcStruct.createObjectInspector(
- TypeInfoUtils.getTypeInfoFromTypeString(typeString)
- );
- final OrcStruct struct = (OrcStruct) oi.create();
- struct.setNumFields(8);
- oi.setStructFieldData(struct, oi.getStructFieldRef("timestamp"), new Text("2000-01-01"));
- oi.setStructFieldData(struct, oi.getStructFieldRef("col1"), new Text("foo"));
- oi.setStructFieldData(struct, oi.getStructFieldRef("col2"), ImmutableList.of(new Text("foo"), new Text("bar")));
- oi.setStructFieldData(struct, oi.getStructFieldRef("col3"), new FloatWritable(1.5f));
- oi.setStructFieldData(struct, oi.getStructFieldRef("col4"), new LongWritable(2));
- oi.setStructFieldData(
- struct,
- oi.getStructFieldRef("col5"),
- new HiveDecimalWritable(HiveDecimal.create(BigDecimal.valueOf(3.5d)))
- );
- oi.setStructFieldData(struct, oi.getStructFieldRef("col6"), null);
- oi.setStructFieldData(struct, oi.getStructFieldRef("col7"), ImmutableMap.of(new Text("subcol7"), new Text("subval7")));
-
- final InputRow row = parser.parseBatch(struct).get(0);
- Assert.assertEquals("timestamp", DateTimes.of("2000-01-01"), row.getTimestamp());
- Assert.assertEquals("col1", "foo", row.getRaw("col1"));
- Assert.assertEquals("col2", ImmutableList.of("foo", "bar"), row.getRaw("col2"));
- Assert.assertEquals("col3", 1.5f, row.getRaw("col3"));
- Assert.assertEquals("col4", 2L, row.getRaw("col4"));
- Assert.assertEquals("col5", 3.5d, row.getRaw("col5"));
- Assert.assertNull("col6", row.getRaw("col6"));
- Assert.assertEquals("col7-subcol7", "subval7", row.getRaw("col7-subcol7"));
- }
-}
diff --git a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcIndexGeneratorJobTest.java
deleted file mode 100644
index 6108fbad490..00000000000
--- a/extensions-contrib/orc-extensions/src/test/java/org/apache/druid/data/input/orc/OrcIndexGeneratorJobTest.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.data.input.orc;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
-import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.indexer.HadoopDruidIndexerConfig;
-import org.apache.druid.indexer.HadoopIOConfig;
-import org.apache.druid.indexer.HadoopIngestionSpec;
-import org.apache.druid.indexer.HadoopTuningConfig;
-import org.apache.druid.indexer.HadoopyShardSpec;
-import org.apache.druid.indexer.IndexGeneratorJob;
-import org.apache.druid.indexer.JobHelper;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
-import org.apache.druid.segment.QueryableIndex;
-import org.apache.druid.segment.QueryableIndexIndexableAdapter;
-import org.apache.druid.segment.RowIterator;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
-import org.apache.druid.timeline.partition.ShardSpec;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.orc.CompressionKind;
-import org.apache.orc.OrcFile;
-import org.apache.orc.TypeDescription;
-import org.apache.orc.Writer;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeComparator;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-public class OrcIndexGeneratorJobTest
-{
- private static final AggregatorFactory[] aggs = {
- new LongSumAggregatorFactory("visited_num", "visited_num"),
- new HyperUniquesAggregatorFactory("unique_hosts", "host")
- };
-
- @Rule
- public final TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- private ObjectMapper mapper;
- private HadoopDruidIndexerConfig config;
- private final String dataSourceName = "website";
- private final List data = ImmutableList.of(
- "2014102200,a.example.com,100",
- "2014102200,b.exmaple.com,50",
- "2014102200,c.example.com,200",
- "2014102200,d.example.com,250",
- "2014102200,e.example.com,123",
- "2014102200,f.example.com,567",
- "2014102200,g.example.com,11",
- "2014102200,h.example.com,251",
- "2014102200,i.example.com,963",
- "2014102200,j.example.com,333",
- "2014102212,a.example.com,100",
- "2014102212,b.exmaple.com,50",
- "2014102212,c.example.com,200",
- "2014102212,d.example.com,250",
- "2014102212,e.example.com,123",
- "2014102212,f.example.com,567",
- "2014102212,g.example.com,11",
- "2014102212,h.example.com,251",
- "2014102212,i.example.com,963",
- "2014102212,j.example.com,333"
- );
- private final Interval interval = Intervals.of("2014-10-22T00:00:00Z/P1D");
- private File dataRoot;
- private File outputRoot;
- private Integer[][][] shardInfoForEachSegment = new Integer[][][]{
- {
- {0, 4},
- {1, 4},
- {2, 4},
- {3, 4}
- }
- };
- private final InputRowParser inputRowParser = new OrcHadoopInputRowParser(
- new TimeAndDimsParseSpec(
- new TimestampSpec("timestamp", "yyyyMMddHH", null),
- new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null)
- ),
- "struct",
- null
- );
-
- private File writeDataToLocalOrcFile(File outputDir, List data) throws IOException
- {
- File outputFile = new File(outputDir, "test.orc");
- TypeDescription schema = TypeDescription.createStruct()
- .addField("timestamp", TypeDescription.createString())
- .addField("host", TypeDescription.createString())
- .addField("visited_num", TypeDescription.createInt());
- Configuration conf = new Configuration();
- Writer writer = OrcFile.createWriter(
- new Path(outputFile.getPath()),
- OrcFile.writerOptions(conf)
- .setSchema(schema)
- .stripeSize(100000)
- .bufferSize(10000)
- .compress(CompressionKind.ZLIB)
- .version(OrcFile.Version.CURRENT)
- );
- VectorizedRowBatch batch = schema.createRowBatch();
- batch.size = data.size();
- for (int idx = 0; idx < data.size(); idx++) {
- String line = data.get(idx);
- String[] lineSplit = line.split(",");
- ((BytesColumnVector) batch.cols[0]).setRef(
- idx,
- StringUtils.toUtf8(lineSplit[0]),
- 0,
- lineSplit[0].length()
- );
- ((BytesColumnVector) batch.cols[1]).setRef(
- idx,
- StringUtils.toUtf8(lineSplit[1]),
- 0,
- lineSplit[1].length()
- );
- ((LongColumnVector) batch.cols[2]).vector[idx] = Long.parseLong(lineSplit[2]);
- }
- writer.addRowBatch(batch);
- writer.close();
-
- return outputFile;
- }
-
- @Before
- public void setUp() throws Exception
- {
- mapper = HadoopDruidIndexerConfig.JSON_MAPPER;
- mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
-
- dataRoot = temporaryFolder.newFolder("data");
- outputRoot = temporaryFolder.newFolder("output");
- File dataFile = writeDataToLocalOrcFile(dataRoot, data);
-
- HashMap inputSpec = new HashMap();
- inputSpec.put("paths", dataFile.getCanonicalPath());
- inputSpec.put("type", "static");
- inputSpec.put("inputFormat", "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat");
-
- config = new HadoopDruidIndexerConfig(
- new HadoopIngestionSpec(
- new DataSchema(
- dataSourceName,
- mapper.convertValue(
- inputRowParser,
- Map.class
- ),
- aggs,
- new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, ImmutableList.of(this.interval)),
- null,
- mapper
- ),
- new HadoopIOConfig(
- ImmutableMap.copyOf(inputSpec),
- null,
- outputRoot.getCanonicalPath()
- ),
- new HadoopTuningConfig(
- outputRoot.getCanonicalPath(),
- null,
- null,
- null,
- null,
- null,
- null,
- true,
- false,
- false,
- false,
- ImmutableMap.of(MRJobConfig.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
- false,
- true,
- null,
- true,
- null,
- false,
- false,
- null,
- null,
- null
- )
- )
- );
- config.setShardSpecs(
- loadShardSpecs(shardInfoForEachSegment)
- );
- config = HadoopDruidIndexerConfig.fromSpec(config.getSchema());
- }
-
- @Test
- public void testIndexGeneratorJob() throws IOException
- {
- verifyJob(new IndexGeneratorJob(config));
- }
-
- private void verifyJob(IndexGeneratorJob job) throws IOException
- {
- Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job), config));
-
- final Map> intervalToSegments = new HashMap<>();
- IndexGeneratorJob
- .getPublishedSegments(config)
- .forEach(segment -> intervalToSegments.computeIfAbsent(segment.getInterval(), k -> new ArrayList<>())
- .add(segment));
-
- final Map> intervalToIndexFiles = new HashMap<>();
- int segmentNum = 0;
- for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) {
- Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
- File segmentOutputFolder = new File(
- StringUtils.format(
- "%s/%s/%s_%s/%s",
- config.getSchema().getIOConfig().getSegmentOutputPath(),
- config.getSchema().getDataSchema().getDataSource(),
- currTime.toString(),
- currTime.plusDays(1).toString(),
- config.getSchema().getTuningConfig().getVersion()
- )
- );
- Assert.assertTrue(segmentOutputFolder.exists());
- Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length);
-
- for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) {
- File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum));
- Assert.assertTrue(individualSegmentFolder.exists());
-
- File indexZip = new File(individualSegmentFolder, "index.zip");
- Assert.assertTrue(indexZip.exists());
-
- intervalToIndexFiles.computeIfAbsent(new Interval(currTime, currTime.plusDays(1)), k -> new ArrayList<>())
- .add(indexZip);
- }
- }
-
- Assert.assertEquals(intervalToSegments.size(), intervalToIndexFiles.size());
-
- segmentNum = 0;
- for (Entry> entry : intervalToSegments.entrySet()) {
- final Interval interval = entry.getKey();
- final List segments = entry.getValue();
- final List indexFiles = intervalToIndexFiles.get(interval);
- Collections.sort(segments);
- indexFiles.sort(Comparator.comparing(File::getAbsolutePath));
-
- Assert.assertNotNull(indexFiles);
- Assert.assertEquals(segments.size(), indexFiles.size());
- Integer[][] shardInfo = shardInfoForEachSegment[segmentNum++];
-
- int rowCount = 0;
- for (int i = 0; i < segments.size(); i++) {
- final DataSegment dataSegment = segments.get(i);
- final File indexZip = indexFiles.get(i);
- Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
- Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
- Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
- Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());
-
- Assert.assertEquals(dataSourceName, dataSegment.getDataSource());
- Assert.assertEquals(1, dataSegment.getDimensions().size());
- String[] dimensions = dataSegment.getDimensions().toArray(new String[0]);
- Arrays.sort(dimensions);
- Assert.assertEquals("host", dimensions[0]);
- Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
- Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
-
- Integer[] hashShardInfo = shardInfo[i];
- HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
- Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
- Assert.assertEquals((int) hashShardInfo[1], spec.getPartitions());
-
- File dir = Files.createTempDir();
-
- unzip(indexZip, dir);
-
- QueryableIndex index = HadoopDruidIndexerConfig.INDEX_IO.loadIndex(dir);
- QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
-
- try (RowIterator rowIt = adapter.getRows()) {
- while (rowIt.moveToNext()) {
- rowCount++;
- Assert.assertEquals(2, rowIt.getPointer().getNumMetrics());
- }
- }
- }
- Assert.assertEquals(rowCount, data.size());
- }
- }
-
- private Map> loadShardSpecs(
- Integer[][][] shardInfoForEachShard
- )
- {
- Map> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance());
- int shardCount = 0;
- int segmentNum = 0;
- for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
- List specs = new ArrayList<>();
- for (Integer[] shardInfo : shardInfoForEachShard[segmentNum++]) {
- specs.add(new HashBasedNumberedShardSpec(shardInfo[0], shardInfo[1], null, HadoopDruidIndexerConfig.JSON_MAPPER));
- }
- List actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
- for (ShardSpec spec : specs) {
- actualSpecs.add(new HadoopyShardSpec(spec, shardCount++));
- }
-
- shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
- }
-
- return shardSpecs;
- }
-
- private void unzip(File zip, File outDir)
- {
- try {
- long size = 0L;
- final byte[] buffer = new byte[1 << 13];
- try (ZipInputStream in = new ZipInputStream(new FileInputStream(zip))) {
- for (ZipEntry entry = in.getNextEntry(); entry != null; entry = in.getNextEntry()) {
- final String fileName = entry.getName();
- try (final OutputStream out = new BufferedOutputStream(
- new FileOutputStream(
- outDir.getAbsolutePath()
- + File.separator
- + fileName
- ), 1 << 13
- )) {
- for (int len = in.read(buffer); len >= 0; len = in.read(buffer)) {
- if (len == 0) {
- continue;
- }
- size += len;
- out.write(buffer, 0, len);
- }
- out.flush();
- }
- }
- }
- }
- catch (IOException | RuntimeException exception) {
- }
- }
-}
diff --git a/extensions-core/orc-extensions/example/TestOrcFile.testDate1900.orc b/extensions-core/orc-extensions/example/TestOrcFile.testDate1900.orc
new file mode 100644
index 00000000000..f51ffdbd03a
Binary files /dev/null and b/extensions-core/orc-extensions/example/TestOrcFile.testDate1900.orc differ
diff --git a/extensions-core/orc-extensions/example/TestOrcFile.testDate2038.orc b/extensions-core/orc-extensions/example/TestOrcFile.testDate2038.orc
new file mode 100644
index 00000000000..cd11fa8a4e9
Binary files /dev/null and b/extensions-core/orc-extensions/example/TestOrcFile.testDate2038.orc differ
diff --git a/extensions-core/orc-extensions/example/orc-file-11-format-hadoop-job.json b/extensions-core/orc-extensions/example/orc-file-11-format-hadoop-job.json
new file mode 100644
index 00000000000..43dfdce039e
--- /dev/null
+++ b/extensions-core/orc-extensions/example/orc-file-11-format-hadoop-job.json
@@ -0,0 +1,85 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "example/orc-file-11-format.orc"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "/tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "test",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": [
+ {
+ "type": "path",
+ "name": "struct_list_struct_int",
+ "expr": "$.middle.list[1].int1"
+ },
+ {
+ "type": "path",
+ "name": "struct_list_struct_intlist",
+ "expr": "$.middle.list[*].int1"
+ },
+ {
+ "type": "path",
+ "name": "list_struct_string",
+ "expr": "$.list[0].string1"
+ },
+ {
+ "type": "path",
+ "name": "map_struct_int",
+ "expr": "$.map.chani.int1"
+ }
+ ]
+ },
+ "timestampSpec": {
+ "column": "ts",
+ "format": "millis"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2015-01-01/2017-01-01"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-core/orc-extensions/example/orc-file-11-format.orc b/extensions-core/orc-extensions/example/orc-file-11-format.orc
new file mode 100644
index 00000000000..41653c84035
Binary files /dev/null and b/extensions-core/orc-extensions/example/orc-file-11-format.orc differ
diff --git a/extensions-core/orc-extensions/example/orc_split_elim.orc b/extensions-core/orc-extensions/example/orc_split_elim.orc
new file mode 100644
index 00000000000..cd145d34310
Binary files /dev/null and b/extensions-core/orc-extensions/example/orc_split_elim.orc differ
diff --git a/extensions-core/orc-extensions/example/orc_split_elim_hadoop_job.json b/extensions-core/orc-extensions/example/orc_split_elim_hadoop_job.json
new file mode 100644
index 00000000000..f98475d06bf
--- /dev/null
+++ b/extensions-core/orc-extensions/example/orc_split_elim_hadoop_job.json
@@ -0,0 +1,64 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "example/orc_split_elim.orc"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "/tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "test",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": []
+ },
+ "timestampSpec": {
+ "column": "ts",
+ "format": "millis"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2015-01-01/2017-01-01"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-core/orc-extensions/example/testDate1900_hadoop_job.json b/extensions-core/orc-extensions/example/testDate1900_hadoop_job.json
new file mode 100644
index 00000000000..9d66f88c0a9
--- /dev/null
+++ b/extensions-core/orc-extensions/example/testDate1900_hadoop_job.json
@@ -0,0 +1,64 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "example/TestOrcFile.testDate1900.orc"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "/tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "test",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": []
+ },
+ "timestampSpec": {
+ "column": "time",
+ "format": "millis"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2015-01-01/2017-01-01"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-core/orc-extensions/example/testDate2038_hadoop_job.json b/extensions-core/orc-extensions/example/testDate2038_hadoop_job.json
new file mode 100644
index 00000000000..035c30e4f64
--- /dev/null
+++ b/extensions-core/orc-extensions/example/testDate2038_hadoop_job.json
@@ -0,0 +1,64 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "example/TestOrcFile.testDate2038.orc"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "/tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "test",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": []
+ },
+ "timestampSpec": {
+ "column": "time",
+ "format": "millis"
+ },
+ "dimensionsSpec": {
+ "dimensions": [
+ ],
+ "dimensionExclusions": [],
+ "spatialDimensions": []
+ }
+ }
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2015-01-01/2017-01-01"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-core/orc-extensions/example/test_1.orc b/extensions-core/orc-extensions/example/test_1.orc
new file mode 100644
index 00000000000..6ffa5512511
Binary files /dev/null and b/extensions-core/orc-extensions/example/test_1.orc differ
diff --git a/extensions-core/orc-extensions/example/test_1_hadoop_job.json b/extensions-core/orc-extensions/example/test_1_hadoop_job.json
new file mode 100755
index 00000000000..1ef0c4ed0dc
--- /dev/null
+++ b/extensions-core/orc-extensions/example/test_1_hadoop_job.json
@@ -0,0 +1,54 @@
+{
+ "type": "index_hadoop",
+ "spec": {
+ "ioConfig": {
+ "type": "hadoop",
+ "inputSpec": {
+ "type": "static",
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "example/test_1.orc"
+ },
+ "metadataUpdateSpec": {
+ "type": "postgresql",
+ "connectURI": "jdbc:postgresql://localhost/druid",
+ "user" : "druid",
+ "password" : "asdf",
+ "segmentTable": "druid_segments"
+ },
+ "segmentOutputPath": "/tmp/segments"
+ },
+ "dataSchema": {
+ "dataSource": "test",
+ "parser": {
+ "type": "orc",
+ "parseSpec": {
+ "format": "orc",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "auto"
+ }
+ }
+ },
+ "metricsSpec": [],
+ "granularitySpec": {
+ "type": "uniform",
+ "segmentGranularity": "DAY",
+ "queryGranularity": "NONE",
+ "intervals": ["2015-01-01/2017-01-01"]
+ }
+ },
+ "tuningConfig": {
+ "type": "hadoop",
+ "workingPath": "tmp/working_path",
+ "partitionsSpec": {
+ "targetPartitionSize": 5000000
+ },
+ "jobProperties" : {
+ "mapreduce.map.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapreduce.reduce.java.opts": "-server -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps",
+ "mapred.child.java.opts": "-server -XX:+PrintGCDetails -XX:+PrintGCTimeStamps"
+ },
+ "leaveIntermediate": true
+ }
+ }
+}
diff --git a/extensions-core/orc-extensions/example/test_2.orc b/extensions-core/orc-extensions/example/test_2.orc
new file mode 100644
index 00000000000..7e648916eaa
Binary files /dev/null and b/extensions-core/orc-extensions/example/test_2.orc differ
diff --git a/extensions-contrib/orc-extensions/example/hadoop_orc_job.json b/extensions-core/orc-extensions/example/test_2_hadoop_job.json
old mode 100755
new mode 100644
similarity index 78%
rename from extensions-contrib/orc-extensions/example/hadoop_orc_job.json
rename to extensions-core/orc-extensions/example/test_2_hadoop_job.json
index c85ebb6721a..30779f9040d
--- a/extensions-contrib/orc-extensions/example/hadoop_orc_job.json
+++ b/extensions-core/orc-extensions/example/test_2_hadoop_job.json
@@ -5,8 +5,8 @@
"type": "hadoop",
"inputSpec": {
"type": "static",
- "inputFormat": "org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat",
- "paths": "wikipedia.gz.orc"
+ "inputFormat": "org.apache.orc.mapreduce.OrcInputFormat",
+ "paths": "example/test_2.orc"
},
"metadataUpdateSpec": {
"type": "postgresql",
@@ -18,26 +18,32 @@
"segmentOutputPath": "/tmp/segments"
},
"dataSchema": {
- "dataSource": "wikipedia",
+ "dataSource": "test",
"parser": {
"type": "orc",
"parseSpec": {
- "format": "timeAndDims",
+ "format": "orc",
+ "flattenSpec": {
+ "useFieldDiscovery": true,
+ "fields": [
+ {
+ "type": "path",
+ "name": "col7-subcol7",
+ "expr": "$.col7.subcol7"
+ }
+ ]
+ },
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
- "timestamp",
- "col1",
- "col2"
],
"dimensionExclusions": [],
"spatialDimensions": []
}
- },
- "typeString": "struct,val1:float>"
+ }
},
"metricsSpec": [],
"granularitySpec": {
diff --git a/extensions-core/orc-extensions/pom.xml b/extensions-core/orc-extensions/pom.xml
new file mode 100644
index 00000000000..fa8c9df7342
--- /dev/null
+++ b/extensions-core/orc-extensions/pom.xml
@@ -0,0 +1,143 @@
+
+
+
+ org.apache.druid.extensions
+ druid-orc-extensions
+ druid-orc-extensions
+ druid-orc-extensions
+
+
+ druid
+ org.apache.druid
+ 0.15.0-incubating-SNAPSHOT
+ ../../pom.xml
+
+ 4.0.0
+
+ 1.5.5
+
+
+
+ org.apache.druid
+ druid-core
+ ${project.parent.version}
+ provided
+
+
+ org.apache.druid
+ druid-indexing-hadoop
+ ${project.parent.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.compile.version}
+ provided
+
+
+ org.apache.orc
+ orc-mapreduce
+ ${orc.version}
+
+
+ com.esotericsoftware
+ kryo-shaded
+
+
+ com.google.guava
+ guava
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+ com.sun.jersey
+ jersey-core
+
+
+ com.sun.jersey
+ jersey-server
+
+
+ commons-cli
+ commons-cli
+
+
+ commons-codec
+ commons-codec
+
+
+ commons-io
+ commons-io
+
+
+ commons-lang
+ commons-lang
+
+
+ commons-logging
+ commons-logging
+
+
+ javax.ws.rs
+ jsr311-api
+
+
+ javax.xml.bind
+ jaxb-api
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+
+
+ log4j
+ log4j
+
+
+ org.codehaus.jackson
+ jackson-core-asl
+
+
+ org.codehaus.jackson
+ jackson-mapper-asl
+
+
+ org.slf4j
+ slf4j-api
+
+
+ xmlenc
+ xmlenc
+
+
+
+
+ junit
+ junit
+ test
+
+
+
diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
similarity index 96%
rename from extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
rename to extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
index 595bb3b856e..138f85e3aaa 100644
--- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcExtensionsModule.java
@@ -36,7 +36,8 @@ public class OrcExtensionsModule implements DruidModule
return Collections.singletonList(
new SimpleModule("OrcInputRowParserModule")
.registerSubtypes(
- new NamedType(OrcHadoopInputRowParser.class, "orc")
+ new NamedType(OrcHadoopInputRowParser.class, "orc"),
+ new NamedType(OrcParseSpec.class, "orc")
)
);
}
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
new file mode 100644
index 00000000000..38b65d1cee2
--- /dev/null
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.common.parsers.ObjectFlattener;
+import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
+import org.apache.orc.mapred.OrcStruct;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.util.List;
+
+public class OrcHadoopInputRowParser implements InputRowParser
+{
+ private final ParseSpec parseSpec;
+ private final ObjectFlattener orcStructFlattener;
+ private final MapInputRowParser parser;
+ private final boolean binaryAsString;
+
+ @JsonCreator
+ public OrcHadoopInputRowParser(
+ @JsonProperty("parseSpec") ParseSpec parseSpec,
+ @Nullable @JsonProperty("binaryAsString") Boolean binaryAsString
+ )
+ {
+ this.parseSpec = parseSpec;
+ this.binaryAsString = binaryAsString == null ? false : binaryAsString;
+ final JSONPathSpec flattenSpec;
+ if (parseSpec instanceof OrcParseSpec) {
+ flattenSpec = ((OrcParseSpec) parseSpec).getFlattenSpec();
+ } else {
+ flattenSpec = JSONPathSpec.DEFAULT;
+ }
+ this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(this.binaryAsString));
+ this.parser = new MapInputRowParser(parseSpec);
+ }
+
+ @NotNull
+ @Override
+ public List parseBatch(OrcStruct input)
+ {
+ return parser.parseBatch(orcStructFlattener.flatten(input));
+ }
+
+ @Override
+ public ParseSpec getParseSpec()
+ {
+ return parseSpec;
+ }
+
+ @Override
+ public InputRowParser withParseSpec(ParseSpec parseSpec)
+ {
+ return new OrcHadoopInputRowParser(parseSpec, binaryAsString);
+ }
+}
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcParseSpec.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcParseSpec.java
new file mode 100644
index 00000000000..6e46bfac318
--- /dev/null
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcParseSpec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.NestedDataParseSpec;
+import org.apache.druid.data.input.impl.ParseSpec;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+
+public class OrcParseSpec extends NestedDataParseSpec
+{
+ @JsonCreator
+ public OrcParseSpec(
+ @JsonProperty("timestampSpec") TimestampSpec timestampSpec,
+ @JsonProperty("dimensionsSpec") DimensionsSpec dimensionsSpec,
+ @JsonProperty("flattenSpec") JSONPathSpec flattenSpec
+ )
+ {
+ super(
+ timestampSpec,
+ dimensionsSpec != null ? dimensionsSpec : DimensionsSpec.EMPTY,
+ flattenSpec != null ? flattenSpec : JSONPathSpec.DEFAULT
+ );
+ }
+
+ @Override
+ public ParseSpec withTimestampSpec(TimestampSpec spec)
+ {
+ return new OrcParseSpec(spec, getDimensionsSpec(), getFlattenSpec());
+ }
+
+ @Override
+ public ParseSpec withDimensionsSpec(DimensionsSpec spec)
+ {
+ return new OrcParseSpec(getTimestampSpec(), spec, getFlattenSpec());
+ }
+
+ @Override
+ public String toString()
+ {
+ return "OrcParseSpec{" +
+ "timestampSpec=" + getTimestampSpec() +
+ ", dimensionsSpec=" + getDimensionsSpec() +
+ ", flattenSpec=" + getFlattenSpec() +
+ "}";
+ }
+}
diff --git a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructConverter.java b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructConverter.java
new file mode 100644
index 00000000000..20fbf069720
--- /dev/null
+++ b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcStructConverter.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.orc;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMap;
+import org.apache.orc.mapred.OrcStruct;
+import org.apache.orc.mapred.OrcTimestamp;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class OrcStructConverter
+{
+ @Nonnull
+ private static List