add parquet support to native batch (#8883)

* add parquet support to native batch

* cleanup

* implement toJson for sampler support

* better binaryAsString test

* docs

* i hate spellcheck

* refactor toMap conversion so can be shared through flattenerMaker, default impls should be good enough for orc+avro, fixup for merge with latest

* add comment, fix some stuff

* adjustments

* fix accident

* tweaks
This commit is contained in:
Clint Wylie 2019-11-22 10:49:16 -08:00 committed by Gian Merlino
parent 9955107e8e
commit 7250010388
26 changed files with 1978 additions and 43 deletions

View File

@ -69,11 +69,11 @@ public interface Firehose extends Closeable
/**
* Returns an {@link InputRowListPlusRawValues} object containing the InputRow plus the raw, unparsed data corresponding to
* the next row available. Used in the sampler to provide the caller with information to assist in configuring a parse
* spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusJson so
* spec. If a ParseException is thrown by the parser, it should be caught and returned in the InputRowListPlusRawValues so
* we will be able to provide information on the raw row which failed to be parsed. Should only be called if hasMore
* returns true.
*
* @return an InputRowListPlusJson which may contain any of: an InputRow, map of the raw data, or a ParseException
* @return an InputRowListPlusRawValues which may contain any of: an InputRow, map of the raw data, or a ParseException
*/
@Deprecated
default InputRowListPlusRawValues nextRowWithRaw() throws IOException

View File

@ -31,6 +31,7 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
import java.io.File;
import java.io.IOException;
/**
* InputFormat abstracts the file format of input data.
@ -57,5 +58,9 @@ public interface InputFormat
@JsonIgnore
boolean isSplittable();
InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory);
InputEntityReader createReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
) throws IOException;
}

View File

@ -32,7 +32,7 @@ public class FileEntity implements InputEntity
{
private final File file;
FileEntity(File file)
public FileEntity(File file)
{
this.file = file;
}

View File

@ -73,8 +73,8 @@ public class InputEntityIteratingReader implements InputSourceReader
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try {
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
return reader.read();
}
catch (IOException e) {
@ -88,8 +88,8 @@ public class InputEntityIteratingReader implements InputSourceReader
{
return createIterator(entity -> {
// InputEntityReader is stateful and so a new one should be created per entity.
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
try {
final InputEntityReader reader = inputFormat.createReader(inputRowSchema, entity, temporaryDirectory);
return reader.sample();
}
catch (IOException e) {

View File

@ -24,6 +24,7 @@ import com.google.common.collect.FluentIterable;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import net.thisptr.jackson.jq.JsonQuery;
import net.thisptr.jackson.jq.exception.JsonQueryException;
@ -43,9 +44,11 @@ import java.util.function.Function;
public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonNode>
{
private static final JsonProvider JSON_PROVIDER = new FastJacksonJsonNodeJsonProvider();
private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder()
.jsonProvider(new FastJacksonJsonNodeJsonProvider())
.jsonProvider(JSON_PROVIDER)
.mappingProvider(new JacksonMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
@ -97,6 +100,12 @@ public class JSONFlattenerMaker implements ObjectFlatteners.FlattenerMaker<JsonN
}
}
@Override
public JsonProvider getJsonProvider()
{
return JSON_PROVIDER;
}
@Nullable
private Object valueConversionFunction(JsonNode val)
{

View File

@ -24,4 +24,6 @@ import java.util.Map;
public interface ObjectFlattener<T>
{
Map<String, Object> flatten(T obj);
Map<String, Object> toMap(T obj);
}

View File

@ -20,13 +20,17 @@
package org.apache.druid.java.util.common.parsers;
import com.google.common.collect.Iterables;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.UOE;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
@ -194,17 +198,88 @@ public class ObjectFlatteners
}
};
}
@Override
public Map<String, Object> toMap(T obj)
{
return flattenerMaker.toMap(obj);
}
};
}
public interface FlattenerMaker<T>
{
JsonProvider getJsonProvider();
/**
* List all "root" primitive properties and primitive lists (no nested objects, no lists of objects)
*/
Iterable<String> discoverRootFields(T obj);
/**
* Get a top level field from a "json" object
*/
Object getRootField(T obj, String key);
/**
* Create a "field" extractor for {@link com.jayway.jsonpath.JsonPath} expressions
*/
Function<T, Object> makeJsonPathExtractor(String expr);
/**
* Create a "field" extractor for 'jq' expressions
*/
Function<T, Object> makeJsonQueryExtractor(String expr);
/**
* Convert object to Java {@link Map} using {@link #getJsonProvider()} and {@link #finalizeConversionForMap} to
* extract and convert data
*/
default Map<String, Object> toMap(T obj)
{
return (Map<String, Object>) toMapHelper(obj);
}
/**
* Recursively traverse "json" object using a {@link JsonProvider}, converting to Java {@link Map} and {@link List},
* potentially transforming via {@link #finalizeConversionForMap} as we go
*/
default Object toMapHelper(Object o)
{
final JsonProvider jsonProvider = getJsonProvider();
if (jsonProvider.isMap(o)) {
Map<String, Object> actualMap = new HashMap<>();
for (String key : jsonProvider.getPropertyKeys(o)) {
Object field = jsonProvider.getMapValue(o, key);
if (jsonProvider.isMap(field) || jsonProvider.isArray(field)) {
actualMap.put(key, toMapHelper(finalizeConversionForMap(field)));
} else {
actualMap.put(key, finalizeConversionForMap(field));
}
}
return actualMap;
} else if (jsonProvider.isArray(o)) {
final int length = jsonProvider.length(o);
List<Object> actualList = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
Object element = jsonProvider.getArrayIndex(o, i);
if (jsonProvider.isMap(element) || jsonProvider.isArray(element)) {
actualList.add(toMapHelper(finalizeConversionForMap(element)));
} else {
actualList.add(finalizeConversionForMap(element));
}
}
return finalizeConversionForMap(actualList);
}
// unknown, just pass it through
return o;
}
/**
* Handle any special conversions for object when translating an input type into a {@link Map} for {@link #toMap}
*/
default Object finalizeConversionForMap(Object o)
{
return o;
}
}
}

View File

@ -26,12 +26,51 @@ title: "Apache Parquet Extension"
This Apache Druid (incubating) module extends [Druid Hadoop based indexing](../../ingestion/hadoop.md) to ingest data directly from offline
Apache Parquet files.
Note: `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to
Note: If using the `parquet-avro` parser for Apache Hadoop based indexing, `druid-parquet-extensions` depends on the `druid-avro-extensions` module, so be sure to
[include both](../../development/extensions.md#loading-extensions).
## Parquet and Native Batch
This extension provides a `parquet` input format which can be used with Druid [native batch ingestion](../../ingestion/native-batch.md).
### Parquet InputFormat
|Field | Type | Description | Required|
|---|---|---|---|
|type| String| This should be set to `parquet` to read Parquet file| yes |
|flattenSpec| JSON Object |Define a [`flattenSpec`](../../ingestion/index.md#flattenspec) to extract nested values from a Parquet file. Note that only 'path' expression are supported ('jq' is unavailable).| no (default will auto-discover 'root' level properties) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default == false) |
### Example
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/some/path/to/file/",
"filter": "file.parquet"
},
"inputFormat": {
"type": "parquet"
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nested",
"expr": "$.path.to.nested"
}
]
}
"binaryAsString": false
},
...
}
...
```
## Parquet Hadoop Parser
This extension provides two ways to parse Parquet files:
For Hadoop, this extension provides two parser implementations for reading Parquet files:
* `parquet` - using a simple conversion contained within this extension
* `parquet-avro` - conversion to avro records with the `parquet-avro` library and using the `druid-avro-extensions`
@ -62,7 +101,7 @@ However, `parquet-avro` was the original basis for this extension, and as such i
|----------|-------------|----------------------------------------------------------------------------------------|---------|
| type | String | Choose `parquet` or `parquet-avro` to determine how Parquet files are parsed | yes |
| parseSpec | JSON Object | Specifies the timestamp and dimensions of the data, and optionally, a flatten spec. Valid parseSpec formats are `timeAndDims`, `parquet`, `avro` (if used with avro conversion). | yes |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be converted to strings anyway. | no(default == false) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no(default == false) |
When the time dimension is a [DateType column](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md), a format should not be supplied. When the format is UTF8 (String), either `auto` or a explicitly defined [format](http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html) is required.

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.avro;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
@ -39,9 +40,10 @@ import java.util.stream.Collectors;
public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<GenericRecord>
{
private static final JsonProvider AVRO_JSON_PROVIDER = new GenericAvroJsonProvider();
private static final Configuration JSONPATH_CONFIGURATION =
Configuration.builder()
.jsonProvider(new GenericAvroJsonProvider())
.jsonProvider(AVRO_JSON_PROVIDER)
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
@ -125,6 +127,12 @@ public class AvroFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Gener
throw new UnsupportedOperationException("Avro + JQ not supported");
}
@Override
public JsonProvider getJsonProvider()
{
return AVRO_JSON_PROVIDER;
}
private Object transformValue(final Object field)
{
if (field instanceof ByteBuffer) {

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.orc;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.orc.TypeDescription;
@ -38,13 +39,15 @@ import java.util.function.Function;
public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker<OrcStruct>
{
private final Configuration jsonPathConfiguration;
private final JsonProvider orcJsonProvider;
private final OrcStructConverter converter;
OrcStructFlattenerMaker(boolean binaryAsString)
{
this.converter = new OrcStructConverter(binaryAsString);
this.orcJsonProvider = new OrcStructJsonProvider(converter);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(new OrcStructJsonProvider(converter))
.jsonProvider(orcJsonProvider)
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
@ -88,6 +91,12 @@ public class OrcStructFlattenerMaker implements ObjectFlatteners.FlattenerMaker<
throw new UnsupportedOperationException("ORC flattener does not support JQ");
}
@Override
public JsonProvider getJsonProvider()
{
return orcJsonProvider;
}
private Object finalizeConversion(Object o)
{
// replace any remaining complex types with null

View File

@ -137,10 +137,249 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<!--
for native batch indexing with Parquet files, we require a small number of classes provided by hadoop-common and
hadoop-mapreduce-client-core. However, both of these jars have a very large set of dependencies, the majority of
which we do not need (and are provided by Hadoop in that environment). hadoop-common is the biggest offender,
with things like zookeeper, jetty, just .. so much stuff. These exclusions remove ~60 jars from being unnecessarily
bundled with this extension. There might be some alternative arrangement to get what we need, worth looking into if
anyone is feeling adventurous.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<scope>provided</scope>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>aopalliance</groupId>
<artifactId>aopalliance</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>javax.inject</groupId>
<artifactId>javax</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.compile.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.yetus</groupId>
<artifactId>audience-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-sslengine</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-util</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jetty</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
<exclusion>
<artifactId>xmlenc</artifactId>
<groupId>xmlenc</groupId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>jsch</artifactId>
<groupId>com.jcraft</groupId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
</exclusion>
<exclusion>
<artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
</exclusion>
<exclusion>
<artifactId>commons-digester</artifactId>
<groupId>commons-digester</groupId>
</exclusion>
<exclusion>
<artifactId>commons-beanutils-core</artifactId>
<groupId>commons-beanutils</groupId>
</exclusion>
<exclusion>
<artifactId>apacheds-kerberos-codec</artifactId>
<groupId>org.apache.directory.server</groupId>
</exclusion>
<exclusion>
<artifactId>nimbus-jose-jwt</artifactId>
<groupId>com.nimbusds</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
@ -162,16 +401,6 @@
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>

View File

@ -23,20 +23,33 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser;
import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
public class ParquetExtensionsModule implements DruidModule
{
public static final String PARQUET_SIMPLE_INPUT_PARSER_TYPE = "parquet";
public static final String PARQUET_SIMPLE_PARSE_SPEC_TYPE = "parquet";
public static final String PARQUET_AVRO_INPUT_PARSER_TYPE = "parquet-avro";
public static final String PARQUET_AVRO_PARSE_SPEC_TYPE = "avro";
static final String PARQUET_SIMPLE_INPUT_PARSER_TYPE = "parquet";
static final String PARQUET_SIMPLE_PARSE_SPEC_TYPE = "parquet";
static final String PARQUET_AVRO_INPUT_PARSER_TYPE = "parquet-avro";
static final String PARQUET_AVRO_PARSE_SPEC_TYPE = "avro";
private Properties props = null;
@Inject
public void setProperties(Properties props)
{
this.props = props;
}
@Override
public List<? extends Module> getJacksonModules()
@ -46,7 +59,8 @@ public class ParquetExtensionsModule implements DruidModule
.registerSubtypes(
new NamedType(ParquetAvroHadoopInputRowParser.class, PARQUET_AVRO_INPUT_PARSER_TYPE),
new NamedType(ParquetHadoopInputRowParser.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE),
new NamedType(ParquetParseSpec.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE)
new NamedType(ParquetParseSpec.class, PARQUET_SIMPLE_INPUT_PARSER_TYPE),
new NamedType(ParquetInputFormat.class, PARQUET_SIMPLE_PARSE_SPEC_TYPE)
)
);
}
@ -54,6 +68,36 @@ public class ParquetExtensionsModule implements DruidModule
@Override
public void configure(Binder binder)
{
// this block of code is common among extensions that use Hadoop things but are not running in Hadoop, in order
// to properly initialize everything
final Configuration conf = new Configuration();
// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
conf.setClassLoader(getClass().getClassLoader());
// Ensure that FileSystem class level initialization happens with correct CL
// See https://github.com/apache/incubator-druid/issues/1714
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
FileSystem.get(conf);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
if (props != null) {
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), props.getProperty(propName));
}
}
}
binder.bind(Configuration.class).toInstance(conf);
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.Objects;
public class ParquetInputFormat extends NestedInputFormat
{
private final boolean binaryAsString;
@JsonCreator
public ParquetInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
)
{
super(flattenSpec);
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
}
@JsonProperty
public boolean getBinaryAsString()
{
return binaryAsString;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputEntityReader createReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
) throws IOException
{
return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
ParquetInputFormat that = (ParquetInputFormat) o;
return binaryAsString == that.binaryAsString;
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), binaryAsString);
}
}

View File

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.parquet.simple.ParquetGroupFlattenerMaker;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
public class ParquetReader extends IntermediateRowParsingReader<Group>
{
private final InputRowSchema inputRowSchema;
private final ObjectFlattener<Group> flattener;
private final org.apache.parquet.hadoop.ParquetReader<Group> reader;
private final Closer closer;
ParquetReader(
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
JSONPathSpec flattenSpec,
boolean binaryAsString
) throws IOException
{
this.inputRowSchema = inputRowSchema;
this.flattener = ObjectFlatteners.create(flattenSpec, new ParquetGroupFlattenerMaker(binaryAsString));
closer = Closer.create();
byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
final InputEntity.CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer));
final Path path = new Path(file.file().toURI());
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build());
}
finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
}
@Override
protected CloseableIterator<Group> intermediateRowIterator()
{
return new CloseableIterator<Group>()
{
Group value = null;
@Override
public boolean hasNext()
{
if (value == null) {
try {
value = reader.read();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return value != null;
}
@Override
public Group next()
{
if (value == null) {
throw new NoSuchElementException();
}
Group currentValue = value;
value = null;
return currentValue;
}
@Override
public void close() throws IOException
{
closer.close();
}
};
}
@Override
protected List<InputRow> parseInputRows(Group intermediateRow) throws ParseException
{
return Collections.singletonList(
MapInputRowParser.parse(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
flattener.flatten(intermediateRow)
)
);
}
@Override
protected Map<String, Object> toMap(Group intermediateRow)
{
return flattener.toMap(intermediateRow);
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.parquet.simple;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.spi.json.JsonProvider;
import org.apache.druid.java.util.common.parsers.NotImplementedMappingProvider;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.parquet.example.data.Group;
@ -37,15 +38,16 @@ import java.util.stream.Collectors;
public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMaker<Group>
{
private final Configuration jsonPathConfiguration;
private final ParquetGroupConverter converter;
private final JsonProvider parquetJsonProvider;
ParquetGroupFlattenerMaker(boolean binaryAsString)
public ParquetGroupFlattenerMaker(boolean binaryAsString)
{
this.converter = new ParquetGroupConverter(binaryAsString);
this.parquetJsonProvider = new ParquetGroupJsonProvider(converter);
this.jsonPathConfiguration = Configuration.builder()
.jsonProvider(new ParquetGroupJsonProvider(converter))
.jsonProvider(parquetJsonProvider)
.mappingProvider(new NotImplementedMappingProvider())
.options(EnumSet.of(Option.SUPPRESS_EXCEPTIONS))
.build();
@ -86,6 +88,18 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak
throw new UnsupportedOperationException("Parquet does not support JQ");
}
@Override
public JsonProvider getJsonProvider()
{
return parquetJsonProvider;
}
@Override
public Object finalizeConversionForMap(Object o)
{
return finalizeConversion(o);
}
/**
* After json conversion, wrapped list items can still need unwrapped. See
* {@link ParquetGroupConverter#isWrappedListPrimitive(Object)} and
@ -101,7 +115,7 @@ public class ParquetGroupFlattenerMaker implements ObjectFlatteners.FlattenerMak
if (ParquetGroupConverter.isWrappedListPrimitive(o)) {
return converter.unwrapListPrimitive(o);
} else if (o instanceof List) {
List<Object> asList = ((List<Object>) o).stream().filter(Objects::nonNull).collect(Collectors.toList());
List<Object> asList = ((List<?>) o).stream().filter(Objects::nonNull).collect(Collectors.toList());
if (asList.stream().allMatch(ParquetGroupConverter::isWrappedListPrimitive)) {
return asList.stream().map(Group.class::cast).map(converter::unwrapListPrimitive).collect(Collectors.toList());
}

View File

@ -39,7 +39,7 @@ public class ParquetGroupJsonProvider implements JsonProvider
{
private final ParquetGroupConverter converter;
ParquetGroupJsonProvider(ParquetGroupConverter converter)
public ParquetGroupJsonProvider(ParquetGroupConverter converter)
{
this.converter = converter;
}

View File

@ -47,21 +47,21 @@ import java.util.Map;
class BaseParquetInputTest
{
private static Map<String, String> parseSpecType = ImmutableMap.of(
private static final Map<String, String> PARSE_SPEC_TYPES = ImmutableMap.of(
ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE,
ParquetExtensionsModule.PARQUET_AVRO_PARSE_SPEC_TYPE,
ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE,
ParquetExtensionsModule.PARQUET_SIMPLE_PARSE_SPEC_TYPE
);
private static Map<String, String> inputFormatType = ImmutableMap.of(
private static final Map<String, String> INPUT_FORMAT_TYPES = ImmutableMap.of(
ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE,
"org.apache.druid.data.input.parquet.DruidParquetAvroInputFormat",
ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE,
"org.apache.druid.data.input.parquet.DruidParquetInputFormat"
);
private static Map<String, Class<? extends InputFormat>> inputFormatClass = ImmutableMap.of(
private static final Map<String, Class<? extends InputFormat>> INPUT_FORMAT_CLASSES = ImmutableMap.of(
ParquetExtensionsModule.PARQUET_AVRO_INPUT_PARSER_TYPE,
DruidParquetAvroInputFormat.class,
ParquetExtensionsModule.PARQUET_SIMPLE_INPUT_PARSER_TYPE,
@ -78,9 +78,9 @@ class BaseParquetInputTest
String template = Strings.utf8ToString(Files.readAllBytes(Paths.get(templateFile)));
String transformed;
if (withParseType) {
transformed = StringUtils.format(template, inputFormatType.get(type), type, parseSpecType.get(type));
transformed = StringUtils.format(template, INPUT_FORMAT_TYPES.get(type), type, PARSE_SPEC_TYPES.get(type));
} else {
transformed = StringUtils.format(template, inputFormatType.get(type), type);
transformed = StringUtils.format(template, INPUT_FORMAT_TYPES.get(type), type);
}
return HadoopDruidIndexerConfig.fromString(transformed);
}
@ -93,7 +93,7 @@ class BaseParquetInputTest
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(
inputFormatClass.get(parserType),
INPUT_FORMAT_CLASSES.get(parserType),
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
@ -117,7 +117,7 @@ class BaseParquetInputTest
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
InputFormat inputFormat = ReflectionUtils.newInstance(
inputFormatClass.get(parserType),
INPUT_FORMAT_CLASSES.get(parserType),
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
class BaseParquetReaderTest
{
ObjectWriter DEFAULT_JSON_WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter();
InputEntityReader createReader(String parquetFile, InputRowSchema schema, JSONPathSpec flattenSpec) throws IOException
{
return createReader(parquetFile, schema, flattenSpec, false);
}
InputEntityReader createReader(
String parquetFile,
InputRowSchema schema,
JSONPathSpec flattenSpec,
boolean binaryAsString
) throws IOException
{
FileEntity entity = new FileEntity(new File(parquetFile));
ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString);
return parquet.createReader(schema, entity, null);
}
List<InputRow> readAllRows(InputEntityReader reader) throws IOException
{
List<InputRow> rows = new ArrayList<>();
try (CloseableIterator<InputRow> iterator = reader.read()) {
iterator.forEachRemaining(rows::add);
}
return rows;
}
List<InputRowListPlusRawValues> sampleAllRows(InputEntityReader reader) throws IOException
{
List<InputRowListPlusRawValues> rows = new ArrayList<>();
try (CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample()) {
iterator.forEachRemaining(rows::add);
}
return rows;
}
}

View File

@ -67,7 +67,7 @@ public class CompatParquetInputTest extends BaseParquetInputTest
InputRow row = ((List<InputRow>) config.getParser().parseBatch(data)).get(0);
// without binaryAsString: true, the value would something like "[104, 101, 121, 32, 116, 104, 105, 115, 32, 105, 115, 3.... ]"
// without binaryAsString: true, the value would be "aGV5IHRoaXMgaXMgJsOpKC3DqF/Dp8OgKT1eJMO5KiEgzqleXg=="
Assert.assertEquals("hey this is &é(-è_çà)=^$ù*! Ω^^", row.getDimension("field").get(0));
Assert.assertEquals(1471800234, row.getTimestampFromEpoch());
}

View File

@ -0,0 +1,440 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Duplicate of {@link CompatParquetInputTest} but for {@link ParquetReader} instead of Hadoop
*/
public class CompatParquetReaderTest extends BaseParquetReaderTest
{
@Test
public void testBinaryAsString() throws IOException
{
final String file = "example/compat/284a0e001476716b-56d5676f53bd6e85_115466471_data.0.parq";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("field"))),
ImmutableList.of()
);
InputEntityReader reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT,
true
);
InputEntityReader readerNotAsString = createReader(
file,
schema,
JSONPathSpec.DEFAULT,
false
);
List<InputRow> rows = readAllRows(reader);
List<InputRow> rowsAsBinary = readAllRows(readerNotAsString);
Assert.assertEquals("hey this is &é(-è_çà)=^$ù*! Ω^^", rows.get(0).getDimension("field").get(0));
Assert.assertEquals(1471800234, rows.get(0).getTimestampFromEpoch());
Assert.assertEquals(
"aGV5IHRoaXMgaXMgJsOpKC3DqF/Dp8OgKT1eJMO5KiEgzqleXg==",
rowsAsBinary.get(0).getDimension("field").get(0)
);
Assert.assertEquals(1471800234, rowsAsBinary.get(0).getTimestampFromEpoch());
reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT,
true
);
readerNotAsString = createReader(
file,
schema,
JSONPathSpec.DEFAULT,
false
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
List<InputRowListPlusRawValues> sampledAsBinary = sampleAllRows(readerNotAsString);
final String expectedJson = "{\n"
+ " \"field\" : \"hey this is &é(-è_çà)=^$ù*! Ω^^\",\n"
+ " \"ts\" : 1471800234\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
final String expectedJsonBinary = "{\n"
+ " \"field\" : \"aGV5IHRoaXMgaXMgJsOpKC3DqF/Dp8OgKT1eJMO5KiEgzqleXg==\",\n"
+ " \"ts\" : 1471800234\n"
+ "}";
Assert.assertEquals(
expectedJsonBinary,
DEFAULT_JSON_WRITER.writeValueAsString(sampledAsBinary.get(0).getRawValues())
);
}
@Test
public void testParquet1217() throws IOException
{
final String file = "example/compat/parquet-1217.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "col", "col"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.col")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
Assert.assertEquals("-1", rows.get(0).getDimension("col").get(0));
Assert.assertEquals(-1, rows.get(0).getMetric("metric1"));
Assert.assertTrue(rows.get(4).getDimension("col").isEmpty());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"col\" : -1\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testParquetThriftCompat() throws IOException
{
/*
message ParquetSchema {
required boolean boolColumn;
required int32 byteColumn;
required int32 shortColumn;
required int32 intColumn;
required int64 longColumn;
required double doubleColumn;
required binary binaryColumn (UTF8);
required binary stringColumn (UTF8);
required binary enumColumn (ENUM);
optional boolean maybeBoolColumn;
optional int32 maybeByteColumn;
optional int32 maybeShortColumn;
optional int32 maybeIntColumn;
optional int64 maybeLongColumn;
optional double maybeDoubleColumn;
optional binary maybeBinaryColumn (UTF8);
optional binary maybeStringColumn (UTF8);
optional binary maybeEnumColumn (ENUM);
required group stringsColumn (LIST) {
repeated binary stringsColumn_tuple (UTF8);
}
required group intSetColumn (LIST) {
repeated int32 intSetColumn_tuple;
}
required group intToStringColumn (MAP) {
repeated group map (MAP_KEY_VALUE) {
required int32 key;
optional binary value (UTF8);
}
}
required group complexColumn (MAP) {
repeated group map (MAP_KEY_VALUE) {
required int32 key;
optional group value (LIST) {
repeated group value_tuple {
required group nestedIntsColumn (LIST) {
repeated int32 nestedIntsColumn_tuple;
}
required binary nestedStringColumn (UTF8);
}
}
}
}
}
*/
final String file = "example/compat/parquet-thrift-compat.snappy.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByLogicalMap", "$.intToStringColumn.1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractByComplexLogicalMap", "$.complexColumn.1[0].nestedIntsColumn[1]")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
Assert.assertEquals("true", rows.get(0).getDimension("boolColumn").get(0));
Assert.assertEquals("0", rows.get(0).getDimension("byteColumn").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("shortColumn").get(0));
Assert.assertEquals("2", rows.get(0).getDimension("intColumn").get(0));
Assert.assertEquals("0", rows.get(0).getDimension("longColumn").get(0));
Assert.assertEquals("0.2", rows.get(0).getDimension("doubleColumn").get(0));
Assert.assertEquals("val_0", rows.get(0).getDimension("binaryColumn").get(0));
Assert.assertEquals("val_0", rows.get(0).getDimension("stringColumn").get(0));
Assert.assertEquals("SPADES", rows.get(0).getDimension("enumColumn").get(0));
Assert.assertTrue(rows.get(0).getDimension("maybeBoolColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeByteColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeShortColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeIntColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeLongColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeDoubleColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeBinaryColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeStringColumn").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("maybeEnumColumn").isEmpty());
Assert.assertEquals("arr_0", rows.get(0).getDimension("stringsColumn").get(0));
Assert.assertEquals("arr_1", rows.get(0).getDimension("stringsColumn").get(1));
Assert.assertEquals("0", rows.get(0).getDimension("intSetColumn").get(0));
Assert.assertEquals("val_1", rows.get(0).getDimension("extractByLogicalMap").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("extractByComplexLogicalMap").get(0));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"enumColumn\" : \"SPADES\",\n"
+ " \"maybeStringColumn\" : { },\n"
+ " \"maybeBinaryColumn\" : { },\n"
+ " \"shortColumn\" : 1,\n"
+ " \"byteColumn\" : 0,\n"
+ " \"maybeBoolColumn\" : { },\n"
+ " \"intColumn\" : 2,\n"
+ " \"doubleColumn\" : 0.2,\n"
+ " \"maybeByteColumn\" : { },\n"
+ " \"intSetColumn\" : [ 0 ],\n"
+ " \"boolColumn\" : true,\n"
+ " \"binaryColumn\" : \"val_0\",\n"
+ " \"maybeIntColumn\" : { },\n"
+ " \"intToStringColumn\" : {\n"
+ " \"0\" : \"val_0\",\n"
+ " \"1\" : \"val_1\",\n"
+ " \"2\" : \"val_2\"\n"
+ " },\n"
+ " \"maybeDoubleColumn\" : { },\n"
+ " \"maybeEnumColumn\" : { },\n"
+ " \"maybeLongColumn\" : { },\n"
+ " \"stringsColumn\" : [ \"arr_0\", \"arr_1\", \"arr_2\" ],\n"
+ " \"longColumn\" : 0,\n"
+ " \"stringColumn\" : \"val_0\",\n"
+ " \"maybeShortColumn\" : { },\n"
+ " \"complexColumn\" : {\n"
+ " \"0\" : [ {\n"
+ " \"nestedStringColumn\" : \"val_0\",\n"
+ " \"nestedIntsColumn\" : [ 0, 1, 2 ]\n"
+ " }, {\n"
+ " \"nestedStringColumn\" : \"val_1\",\n"
+ " \"nestedIntsColumn\" : [ 1, 2, 3 ]\n"
+ " }, {\n"
+ " \"nestedStringColumn\" : \"val_2\",\n"
+ " \"nestedIntsColumn\" : [ 2, 3, 4 ]\n"
+ " } ],\n"
+ " \"1\" : [ {\n"
+ " \"nestedStringColumn\" : \"val_0\",\n"
+ " \"nestedIntsColumn\" : [ 0, 1, 2 ]\n"
+ " }, {\n"
+ " \"nestedStringColumn\" : \"val_1\",\n"
+ " \"nestedIntsColumn\" : [ 1, 2, 3 ]\n"
+ " }, {\n"
+ " \"nestedStringColumn\" : \"val_2\",\n"
+ " \"nestedIntsColumn\" : [ 2, 3, 4 ]\n"
+ " } ],\n"
+ " \"2\" : [ {\n"
+ " \"nestedStringColumn\" : \"val_0\",\n"
+ " \"nestedIntsColumn\" : [ 0, 1, 2 ]\n"
+ " }, {\n"
+ " \"nestedStringColumn\" : \"val_1\",\n"
+ " \"nestedIntsColumn\" : [ 1, 2, 3 ]\n"
+ " }, {\n"
+ " \"nestedStringColumn\" : \"val_2\",\n"
+ " \"nestedIntsColumn\" : [ 2, 3, 4 ]\n"
+ " } ]\n"
+ " }\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testOldRepeatedInt() throws IOException
{
final String file = "example/compat/old-repeated-int.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("repeatedInt"))),
Collections.emptyList()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "repeatedInt", "repeatedInt")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
Assert.assertEquals("1", rows.get(0).getDimension("repeatedInt").get(0));
Assert.assertEquals("2", rows.get(0).getDimension("repeatedInt").get(1));
Assert.assertEquals("3", rows.get(0).getDimension("repeatedInt").get(2));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"repeatedInt\" : [ 1, 2, 3 ]\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testReadNestedArrayStruct() throws IOException
{
final String file = "example/compat/nested-array-struct.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec", "extracted1", "extracted2"))),
Collections.emptyList()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted1", "$.myComplex[0].id"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extracted2", "$.myComplex[0].repeatedMessage[*].someId")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString());
Assert.assertEquals("5", rows.get(1).getDimension("primitive").get(0));
Assert.assertEquals("4", rows.get(1).getDimension("extracted1").get(0));
Assert.assertEquals("6", rows.get(1).getDimension("extracted2").get(0));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"primitive\" : 2,\n"
+ " \"myComplex\" : [ {\n"
+ " \"id\" : 1,\n"
+ " \"repeatedMessage\" : [ 3 ]\n"
+ " } ]\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testProtoStructWithArray() throws IOException
{
final String file = "example/compat/proto-struct-with-array.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedOptional", "$.optionalMessage.someId"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedRequired", "$.requiredMessage.someId"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "extractedRepeated", "$.repeatedMessage[*]")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(0).getTimestamp().toString());
Assert.assertEquals("10", rows.get(0).getDimension("optionalPrimitive").get(0));
Assert.assertEquals("9", rows.get(0).getDimension("requiredPrimitive").get(0));
Assert.assertTrue(rows.get(0).getDimension("repeatedPrimitive").isEmpty());
Assert.assertTrue(rows.get(0).getDimension("extractedOptional").isEmpty());
Assert.assertEquals("9", rows.get(0).getDimension("extractedRequired").get(0));
Assert.assertEquals("9", rows.get(0).getDimension("extractedRepeated").get(0));
Assert.assertEquals("10", rows.get(0).getDimension("extractedRepeated").get(1));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"optionalMessage\" : { },\n"
+ " \"requiredPrimitive\" : 9,\n"
+ " \"repeatedPrimitive\" : { },\n"
+ " \"repeatedMessage\" : [ 9, 10 ],\n"
+ " \"optionalPrimitive\" : 10,\n"
+ " \"requiredMessage\" : {\n"
+ " \"someId\" : 9\n"
+ " }\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
/**
* Duplicate of {@link DecimalParquetInputTest} but for {@link ParquetReader} instead of Hadoop
*/
public class DecimalParquetReaderTest extends BaseParquetReaderTest
{
@Test
public void testReadParquetDecimalFixedLen() throws IOException
{
final String file = "example/decimals/dec-in-fixed-len.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("fixed_len_dec"))),
ImmutableList.of("metric1")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "fixed_len_dec", "fixed_len_dec"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.fixed_len_dec")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString());
Assert.assertEquals("1.0", rows.get(1).getDimension("fixed_len_dec").get(0));
Assert.assertEquals(new BigDecimal("1.0"), rows.get(1).getMetric("metric1"));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"fixed_len_dec\" : 1.0\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(1).getRawValues()));
}
@Test
public void testReadParquetDecimali32() throws IOException
{
final String file = "example/decimals/dec-in-i32.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i32_dec"))),
ImmutableList.of("metric1")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i32_dec"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.i32_dec")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString());
Assert.assertEquals("100", rows.get(1).getDimension("i32_dec").get(0));
Assert.assertEquals(new BigDecimal(100), rows.get(1).getMetric("metric1"));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"i32_dec\" : 100\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(1).getRawValues()));
}
@Test
public void testReadParquetDecimali64() throws IOException
{
final String file = "example/decimals/dec-in-i64.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", DateTimes.of("2018-09-01T00:00:00.000Z")),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("i64_dec"))),
ImmutableList.of("metric1")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "i32_dec", "i64_dec"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric1", "$.i64_dec")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2018-09-01T00:00:00.000Z", rows.get(1).getTimestamp().toString());
Assert.assertEquals("100", rows.get(1).getDimension("i64_dec").get(0));
Assert.assertEquals(new BigDecimal(100), rows.get(1).getMetric("metric1"));
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"i64_dec\" : 100\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(1).getRawValues()));
}
}

View File

@ -36,7 +36,7 @@ import java.util.List;
@RunWith(Parameterized.class)
public class FlattenSpecParquetInputTest extends BaseParquetInputTest
{
private static final String TS1 = "2018-09-18T00:18:00.023Z";
static final String TS1 = "2018-09-18T00:18:00.023Z";
@Parameterized.Parameters(name = "type = {0}")
public static Iterable<Object[]> constructorFeeder()

View File

@ -0,0 +1,365 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Duplicate of {@link FlattenSpecParquetInputTest} but for {@link ParquetReader} instead of Hadoop
*/
public class FlattenSpecParquetReaderTest extends BaseParquetReaderTest
{
private static final String FLAT_JSON = "{\n"
+ " \"listDim\" : [ \"listDim1v1\", \"listDim1v2\" ],\n"
+ " \"dim3\" : 1,\n"
+ " \"dim2\" : \"d2v1\",\n"
+ " \"dim1\" : \"d1v1\",\n"
+ " \"metric1\" : 1,\n"
+ " \"timestamp\" : 1537229880023\n"
+ "}";
private static final String NESTED_JSON = "{\n"
+ " \"nestedData\" : {\n"
+ " \"listDim\" : [ \"listDim1v1\", \"listDim1v2\" ],\n"
+ " \"dim3\" : 1,\n"
+ " \"dim2\" : \"d2v1\",\n"
+ " \"metric2\" : 2\n"
+ " },\n"
+ " \"dim1\" : \"d1v1\",\n"
+ " \"metric1\" : 1,\n"
+ " \"timestamp\" : 1537229880023\n"
+ "}";
@Test
public void testFlat1NoFlattenSpec() throws IOException
{
final String file = "example/flattening/test_flat_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "listDim"))),
ImmutableList.of("metric1", "metric2")
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0));
Assert.assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testFlat1Autodiscover() throws IOException
{
final String file = "example/flattening/test_flat_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1", "metric2")
);
InputEntityReader reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0));
Assert.assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testFlat1Flatten() throws IOException
{
final String file = "example/flattening/test_flat_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "dim3", "list"))),
ImmutableList.of("metric1", "metric2")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim2", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim3", null),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "list", "$.listDim")
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0));
Assert.assertEquals("listDim1v1", rows.get(0).getDimension("list").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("list").get(1));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testFlat1FlattenSelectListItem() throws IOException
{
final String file = "example/flattening/test_flat_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2", "listExtracted"))),
ImmutableList.of("metric1", "metric2")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim2", null),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "listExtracted", "$.listDim[1]")
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listExtracted").get(0));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(FLAT_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testNested1NoFlattenSpec() throws IOException
{
final String file = "example/flattening/test_nested_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1"))),
ImmutableList.of("metric1")
);
JSONPathSpec flattenSpec = new JSONPathSpec(false, ImmutableList.of());
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
List<String> dims = rows.get(0).getDimensions();
Assert.assertEquals(1, dims.size());
Assert.assertFalse(dims.contains("dim2"));
Assert.assertFalse(dims.contains("dim3"));
Assert.assertFalse(dims.contains("listDim"));
Assert.assertFalse(dims.contains("nestedData"));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testNested1Autodiscover() throws IOException
{
final String file = "example/flattening/test_nested_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1", "metric2")
);
InputEntityReader reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
List<String> dims = rows.get(0).getDimensions();
Assert.assertFalse(dims.contains("dim2"));
Assert.assertFalse(dims.contains("dim3"));
Assert.assertFalse(dims.contains("listDim"));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testNested1Flatten() throws IOException
{
final String file = "example/flattening/test_nested_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
ImmutableList.of("metric1", "metric2")
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim2", "$.nestedData.dim2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim3", "$.nestedData.dim3"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "metric2", "$.nestedData.metric2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "listDim", "$.nestedData.listDim[*]")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0));
Assert.assertEquals("listDim1v1", rows.get(0).getDimension("listDim").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listDim").get(1));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
Assert.assertEquals(2, rows.get(0).getMetric("metric2").longValue());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testNested1FlattenSelectListItem() throws IOException
{
final String file = "example/flattening/test_nested_1.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
List<JSONPathFieldSpec> flattenExpr = ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "timestamp", null),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "dim1", null),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim2", "$.nestedData.dim2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "dim3", "$.nestedData.dim3"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "listextracted", "$.nestedData.listDim[1]")
);
JSONPathSpec flattenSpec = new JSONPathSpec(true, flattenExpr);
InputEntityReader reader = createReader(
file,
schema,
flattenSpec
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals(FlattenSpecParquetInputTest.TS1, rows.get(0).getTimestamp().toString());
Assert.assertEquals("d1v1", rows.get(0).getDimension("dim1").get(0));
Assert.assertEquals("d2v1", rows.get(0).getDimension("dim2").get(0));
Assert.assertEquals("1", rows.get(0).getDimension("dim3").get(0));
Assert.assertEquals("listDim1v2", rows.get(0).getDimension("listextracted").get(0));
Assert.assertEquals(1, rows.get(0).getMetric("metric1").longValue());
reader = createReader(
file,
schema,
flattenSpec
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
Assert.assertEquals(NESTED_JSON, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Duplicate of {@link TimestampsParquetInputTest} but for {@link ParquetReader} instead of Hadoop
*/
public class TimestampsParquetReaderTest extends BaseParquetReaderTest
{
@Test
public void testDateHandling() throws IOException
{
final String file = "example/timestamps/test_date_data.snappy.parquet";
InputRowSchema schemaAsString = new InputRowSchema(
new TimestampSpec("date_as_string", "Y-M-d", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
InputRowSchema schemaAsDate = new InputRowSchema(
new TimestampSpec("date_as_date", null, null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
InputEntityReader readerAsString = createReader(
file,
schemaAsString,
JSONPathSpec.DEFAULT
);
InputEntityReader readerAsDate = createReader(
file,
schemaAsDate,
JSONPathSpec.DEFAULT
);
List<InputRow> rowsWithString = readAllRows(readerAsString);
List<InputRow> rowsWithDate = readAllRows(readerAsDate);
Assert.assertEquals(rowsWithDate.size(), rowsWithString.size());
for (int i = 0; i < rowsWithDate.size(); i++) {
Assert.assertEquals(rowsWithString.get(i).getTimestamp(), rowsWithDate.get(i).getTimestamp());
}
readerAsString = createReader(
file,
schemaAsString,
JSONPathSpec.DEFAULT
);
readerAsDate = createReader(
file,
schemaAsDate,
JSONPathSpec.DEFAULT
);
List<InputRowListPlusRawValues> sampledAsString = sampleAllRows(readerAsString);
List<InputRowListPlusRawValues> sampledAsDate = sampleAllRows(readerAsDate);
final String expectedJson = "{\n"
+ " \"date_as_string\" : \"2017-06-18\",\n"
+ " \"timestamp_as_timestamp\" : 1497702471815,\n"
+ " \"timestamp_as_string\" : \"2017-06-17 14:27:51.815\",\n"
+ " \"idx\" : 1,\n"
+ " \"date_as_date\" : 1497744000000\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampledAsString.get(0).getRawValues()));
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampledAsDate.get(0).getRawValues()));
}
@Test
public void testParseInt96Timestamp() throws IOException
{
// the source parquet file was found in apache spark sql repo tests, where it is known as impala_timestamp.parq
// it has a single column, "ts" which is an int96 timestamp
final String file = "example/timestamps/int96_timestamp.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("ts", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
InputEntityReader reader = createReader(file, schema, JSONPathSpec.DEFAULT);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("2001-01-01T01:01:01.000Z", rows.get(0).getTimestamp().toString());
reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"ts\" : 978310861000\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
@Test
public void testTimeMillisInInt64() throws IOException
{
final String file = "example/timestamps/timemillis-in-i64.parquet";
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of())),
Collections.emptyList()
);
InputEntityReader reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("1970-01-01T00:00:00.010Z", rows.get(0).getTimestamp().toString());
reader = createReader(
file,
schema,
JSONPathSpec.DEFAULT
);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"time\" : 10\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.parquet;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/**
* Duplicate of {@link WikiParquetInputTest} but for {@link ParquetReader} instead of Hadoop
*/
public class WikiParquetReaderTest extends BaseParquetReaderTest
{
@Test
public void testWiki() throws IOException
{
InputRowSchema schema = new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", "language", "user", "unpatrolled"))),
Collections.emptyList()
);
InputEntityReader reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT);
List<InputRow> rows = readAllRows(reader);
Assert.assertEquals("Gypsy Danger", rows.get(0).getDimension("page").get(0));
String s1 = rows.get(0).getDimension("language").get(0);
String s2 = rows.get(0).getDimension("language").get(1);
Assert.assertEquals("en", s1);
Assert.assertEquals("zh", s2);
reader = createReader("example/wiki/wiki.parquet", schema, JSONPathSpec.DEFAULT);
List<InputRowListPlusRawValues> sampled = sampleAllRows(reader);
final String expectedJson = "{\n"
+ " \"continent\" : \"North America\",\n"
+ " \"country\" : \"United States\",\n"
+ " \"added\" : 57,\n"
+ " \"city\" : \"San Francisco\",\n"
+ " \"unpatrolled\" : \"true\",\n"
+ " \"delta\" : -143,\n"
+ " \"language\" : [ \"en\", \"zh\" ],\n"
+ " \"robot\" : \"false\",\n"
+ " \"deleted\" : 200,\n"
+ " \"newPage\" : \"true\",\n"
+ " \"namespace\" : \"article\",\n"
+ " \"anonymous\" : \"false\",\n"
+ " \"page\" : \"Gypsy Danger\",\n"
+ " \"region\" : \"Bay Area\",\n"
+ " \"user\" : \"nuclear\",\n"
+ " \"timestamp\" : \"2013-08-31T01:02:33Z\"\n"
+ "}";
Assert.assertEquals(expectedJson, DEFAULT_JSON_WRITER.writeValueAsString(sampled.get(0).getRawValues()));
}
}

View File

@ -90,6 +90,7 @@ ISO8601
IndexSpec
IndexTask
InfluxDB
InputFormat
Integer.MAX_VALUE
JBOD
JDBC