Support orc format for native batch ingestion (#8950)

* Support orc format for native batch ingestion

* fix pom and remove wrong comment

* fix unnecessary condition check

* use flatMap back to handle exception properly

* move exceptionThrowingIterator to intermediateRowParsingReader

* runtime
This commit is contained in:
Jihoon Son 2019-11-28 12:45:24 -08:00 committed by GitHub
parent 55ecaafff0
commit 86e8903523
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 975 additions and 124 deletions

View File

@ -19,13 +19,14 @@
package org.apache.druid.data.input;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
/**
* {@link InputEntityReader} that parses bytes into some intermediate rows first, and then into {@link InputRow}s.
@ -39,25 +40,60 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
@Override
public CloseableIterator<InputRow> read() throws IOException
{
return intermediateRowIterator().flatMap(row -> {
try {
// since parseInputRows() returns a list, the below line always iterates over the list,
// which means it calls Iterator.hasNext() and Iterator.next() at least once per row.
// This could be unnecessary if the row wouldn't be exploded into multiple inputRows.
// If this line turned out to be a performance bottleneck, perhaps parseInputRows() interface might not be a
// good idea. Subclasses could implement read() with some duplicate codes to avoid unnecessary iteration on
// a singleton list.
return CloseableIterators.withEmptyBaggage(parseInputRows(row).iterator());
final CloseableIterator<T> intermediateRowIterator = intermediateRowIterator();
return new CloseableIterator<InputRow>()
{
// since parseInputRows() returns a list, the below line always iterates over the list,
// which means it calls Iterator.hasNext() and Iterator.next() at least once per row.
// This could be unnecessary if the row wouldn't be exploded into multiple inputRows.
// If this line turned out to be a performance bottleneck, perhaps parseInputRows() interface might not be a
// good idea. Subclasses could implement read() with some duplicate codes to avoid unnecessary iteration on
// a singleton list.
Iterator<InputRow> rows = null;
@Override
public boolean hasNext()
{
if (rows == null || !rows.hasNext()) {
if (!intermediateRowIterator.hasNext()) {
return false;
}
final T row = intermediateRowIterator.next();
try {
rows = parseInputRows(row).iterator();
}
catch (IOException e) {
rows = new ExceptionThrowingIterator(new ParseException(e, "Unable to parse row [%s]", row));
}
catch (ParseException e) {
rows = new ExceptionThrowingIterator(e);
}
}
return true;
}
catch (IOException e) {
throw new ParseException(e, "Unable to parse row [%s]", row);
@Override
public InputRow next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
return rows.next();
}
});
@Override
public void close() throws IOException
{
intermediateRowIterator.close();
}
};
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample()
throws IOException
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return intermediateRowIterator().map(row -> {
final Map<String, Object> rawColumns;
@ -87,6 +123,9 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
/**
* Parses the given intermediate row into a list of {@link InputRow}s.
* This should return a non-empty list.
*
* @throws ParseException if it cannot parse the given intermediateRow properly
*/
protected abstract List<InputRow> parseInputRows(T intermediateRow) throws IOException, ParseException;
@ -95,4 +134,39 @@ public abstract class IntermediateRowParsingReader<T> implements InputEntityRead
* Implementations can use any method to convert the given row into a Map.
*/
protected abstract Map<String, Object> toMap(T intermediateRow) throws IOException;
private static class ExceptionThrowingIterator implements CloseableIterator<InputRow>
{
private final Exception exception;
private boolean thrown = false;
private ExceptionThrowingIterator(Exception exception)
{
this.exception = exception;
}
@Override
public boolean hasNext()
{
return !thrown;
}
@Override
public InputRow next()
{
thrown = true;
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else {
throw new RuntimeException(exception);
}
}
@Override
public void close() throws IOException
{
// do nothing
}
}
}

View File

@ -80,14 +80,8 @@ public interface CloseableIterator<T> extends Iterator<T>, Closeable
throw new UncheckedIOException(e);
}
}
try {
iterator = function.apply(delegate.next());
if (iterator.hasNext()) {
return iterator;
}
}
catch (Exception e) {
iterator = new ExceptionThrowingIterator<>(e);
iterator = function.apply(delegate.next());
if (iterator.hasNext()) {
return iterator;
}
}
@ -121,39 +115,4 @@ public interface CloseableIterator<T> extends Iterator<T>, Closeable
}
};
}
class ExceptionThrowingIterator<T> implements CloseableIterator<T>
{
private final Exception exception;
private boolean thrown = false;
private ExceptionThrowingIterator(Exception exception)
{
this.exception = exception;
}
@Override
public boolean hasNext()
{
return !thrown;
}
@Override
public T next()
{
thrown = true;
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else {
throw new RuntimeException(exception);
}
}
@Override
public void close() throws IOException
{
// do nothing
}
}
}

View File

@ -48,12 +48,6 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.compile.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-mapreduce</artifactId>
@ -178,12 +172,253 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>aopalliance</groupId>
<artifactId>aopalliance</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>javax.inject</groupId>
<artifactId>javax</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<scope>runtime</scope>
</dependency>
<!--
for native batch indexing with Orc files, we require a small number of classes provided by hadoop-common and
hadoop-mapreduce-client-core. However, both of these jars have a very large set of dependencies, the majority of
which we do not need (and are provided by Hadoop in that environment). hadoop-common is the biggest offender,
with things like zookeeper, jetty, just .. so much stuff. These exclusions remove ~60 jars from being unnecessarily
bundled with this extension. There might be some alternative arrangement to get what we need, worth looking into if
anyone is feeling adventurous.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.apache.yetus</groupId>
<artifactId>audience-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-kerberos-codec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
<exclusion>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
<exclusion>
<groupId>javax.ws.rs</groupId>
<artifactId>jsr311-api</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>com.nimbusds</groupId>
<artifactId>nimbus-jose-jwt</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.inject</groupId>
@ -213,6 +448,7 @@
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-storage-api</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>commons-lang</groupId>
@ -229,5 +465,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -23,13 +23,26 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Inject;
import org.apache.druid.initialization.DruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
public class OrcExtensionsModule implements DruidModule
{
private Properties props = null;
@Inject
public void setProperties(Properties props)
{
this.props = props;
}
@Override
public List<? extends Module> getJacksonModules()
{
@ -37,13 +50,45 @@ public class OrcExtensionsModule implements DruidModule
new SimpleModule("OrcInputRowParserModule")
.registerSubtypes(
new NamedType(OrcHadoopInputRowParser.class, "orc"),
new NamedType(OrcParseSpec.class, "orc")
)
new NamedType(OrcParseSpec.class, "orc"),
new NamedType(OrcInputFormat.class, "orc")
)
);
}
@Override
public void configure(Binder binder)
{
// this block of code is common among extensions that use Hadoop things but are not running in Hadoop, in order
// to properly initialize everything
final Configuration conf = new Configuration();
// Set explicit CL. Otherwise it'll try to use thread context CL, which may not have all of our dependencies.
conf.setClassLoader(getClass().getClassLoader());
// Ensure that FileSystem class level initialization happens with correct CL
// See https://github.com/apache/incubator-druid/issues/1714
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
FileSystem.get(conf);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
if (props != null) {
for (String propName : props.stringPropertyNames()) {
if (propName.startsWith("hadoop.")) {
conf.set(propName.substring("hadoop.".length()), props.getProperty(propName));
}
}
}
binder.bind(Configuration.class).toInstance(conf);
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.NestedInputFormat;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Objects;
public class OrcInputFormat extends NestedInputFormat
{
private final boolean binaryAsString;
private final Configuration conf;
@JsonCreator
public OrcInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
@JacksonInject Configuration conf
)
{
super(flattenSpec);
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
this.conf = conf;
}
@Override
public boolean isSplittable()
{
return false;
}
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return new OrcReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
OrcInputFormat that = (OrcInputFormat) o;
return binaryAsString == that.binaryAsString &&
Objects.equals(conf, that.conf);
}
@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), binaryAsString, conf);
}
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcMapredRecordReader;
import org.apache.orc.mapred.OrcStruct;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
public class OrcReader extends IntermediateRowParsingReader<OrcStruct>
{
private final Configuration conf;
private final InputRowSchema inputRowSchema;
private final InputEntity source;
private final File temporaryDirectory;
private final ObjectFlattener<OrcStruct> orcStructFlattener;
OrcReader(
Configuration conf,
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory,
JSONPathSpec flattenSpec,
boolean binaryAsString
)
{
this.conf = conf;
this.inputRowSchema = inputRowSchema;
this.source = source;
this.temporaryDirectory = temporaryDirectory;
this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(binaryAsString));
}
@Override
protected CloseableIterator<OrcStruct> intermediateRowIterator() throws IOException
{
final Closer closer = Closer.create();
// We fetch here to cache a copy locally. However, this might need to be changed if we want to split an orc file
// into several InputSplits in the future.
final byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
final CleanableFile file = closer.register(source.fetch(temporaryDirectory, buffer));
final Path path = new Path(file.file().toURI());
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
final Reader reader;
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
reader = closer.register(OrcFile.createReader(path, OrcFile.readerOptions(conf)));
}
finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
// The below line will get the schmea to read the whole columns.
// This can be improved by projecting some columns only what users want in the future.
final TypeDescription schema = reader.getSchema();
final RecordReader batchReader = reader.rows(reader.options());
final OrcMapredRecordReader<OrcStruct> recordReader = new OrcMapredRecordReader<>(batchReader, schema);
closer.register(recordReader::close);
return new CloseableIterator<OrcStruct>()
{
final NullWritable key = recordReader.createKey();
OrcStruct value = null;
@Override
public boolean hasNext()
{
if (value == null) {
try {
// The returned OrcStruct in next() can be kept in memory for a while.
// Here, we create a new instance of OrcStruct before calling RecordReader.next(),
// so that we can avoid to share the same reference to the "value" across rows.
value = recordReader.createValue();
if (!recordReader.next(key, value)) {
value = null;
}
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
return value != null;
}
@Override
public OrcStruct next()
{
if (value == null) {
throw new NoSuchElementException();
}
final OrcStruct currentValue = value;
value = null;
return currentValue;
}
@Override
public void close() throws IOException
{
closer.close();
}
};
}
@Override
protected List<InputRow> parseInputRows(OrcStruct intermediateRow) throws ParseException
{
return Collections.singletonList(
MapInputRowParser.parse(
inputRowSchema.getTimestampSpec(),
inputRowSchema.getDimensionsSpec(),
orcStructFlattener.flatten(intermediateRow)
)
);
}
@Override
protected Map<String, Object> toMap(OrcStruct intermediateRow)
{
return orcStructFlattener.toMap(intermediateRow);
}
}

View File

@ -27,16 +27,15 @@ import org.apache.druid.indexer.path.StaticPathSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.orc.mapred.OrcInputFormat;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcInputFormat;
import org.junit.Assert;
import org.junit.Test;
@ -44,11 +43,12 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
public class OrcHadoopInputRowParserTest
{
@Test
public void testTest1() throws IOException, InterruptedException
public void testTest1() throws IOException
{
// total auto-discover fields (no flattenSpec, no dimensionSpec)
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/test_1_hadoop_job.json");
@ -72,7 +72,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
public void testTest2() throws IOException, InterruptedException
public void testTest2() throws IOException
{
HadoopDruidIndexerConfig config = loadHadoopDruidIndexerConfig("example/test_2_hadoop_job.json");
Job job = Job.getInstance(new Configuration());
@ -97,7 +97,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
public void testOrcFile11Format() throws IOException, InterruptedException
public void testOrcFile11Format() throws IOException
{
// not sure what file 11 format means, but we'll test it!
@ -133,8 +133,8 @@ public class OrcHadoopInputRowParserTest
// first row has empty 'map' column, so lets read another!
List<InputRow> allRows = getAllRows(config);
InputRow anotherRow = allRows.get(0);
Assert.assertEquals(14, rows.get(0).getDimensions().size());
InputRow anotherRow = allRows.get(allRows.size() - 1);
Assert.assertEquals(14, anotherRow.getDimensions().size());
Assert.assertEquals("true", anotherRow.getDimension("boolean1").get(0));
Assert.assertEquals("100", anotherRow.getDimension("byte1").get(0));
Assert.assertEquals("2048", anotherRow.getDimension("short1").get(0));
@ -142,7 +142,7 @@ public class OrcHadoopInputRowParserTest
Assert.assertEquals("9223372036854775807", anotherRow.getDimension("long1").get(0));
Assert.assertEquals("2.0", anotherRow.getDimension("float1").get(0));
Assert.assertEquals("-5.0", anotherRow.getDimension("double1").get(0));
Assert.assertEquals("AAECAwQAAA==", rows.get(0).getDimension("bytes1").get(0));
Assert.assertEquals("", anotherRow.getDimension("bytes1").get(0));
Assert.assertEquals("bye", anotherRow.getDimension("string1").get(0));
Assert.assertEquals("1.23456786547457E7", anotherRow.getDimension("decimal1").get(0));
Assert.assertEquals("2", anotherRow.getDimension("struct_list_struct_int").get(0));
@ -151,7 +151,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
public void testOrcSplitElim() throws IOException, InterruptedException
public void testOrcSplitElim() throws IOException
{
// not sure what SplitElim means, but we'll test it!
@ -175,7 +175,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
public void testDate1900() throws IOException, InterruptedException
public void testDate1900() throws IOException
{
/*
TestOrcFile.testDate1900.orc
@ -194,7 +194,7 @@ public class OrcHadoopInputRowParserTest
}
@Test
public void testDate2038() throws IOException, InterruptedException
public void testDate2038() throws IOException
{
/*
TestOrcFile.testDate2038.orc
@ -217,54 +217,68 @@ public class OrcHadoopInputRowParserTest
return HadoopDruidIndexerConfig.fromFile(new File(configPath));
}
private static OrcStruct getFirstRow(Job job, String orcPath) throws IOException, InterruptedException
private static OrcStruct getFirstRow(Job job, String orcPath) throws IOException
{
File testFile = new File(orcPath);
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
FileSplit split = new FileSplit(path, 0, testFile.length(), new String[]{"host"});
InputFormat inputFormat = ReflectionUtils.newInstance(
InputFormat<NullWritable, OrcStruct> inputFormat = ReflectionUtils.newInstance(
OrcInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
reader.initialize(split, context);
reader.nextKeyValue();
return (OrcStruct) reader.getCurrentValue();
RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(
split,
new JobConf(job.getConfiguration()),
null
);
try {
final NullWritable key = reader.createKey();
final OrcStruct value = reader.createValue();
if (reader.next(key, value)) {
return value;
} else {
throw new NoSuchElementException();
}
}
finally {
reader.close();
}
}
private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config)
throws IOException, InterruptedException
private static List<InputRow> getAllRows(HadoopDruidIndexerConfig config) throws IOException
{
Job job = Job.getInstance(new Configuration());
config.intoConfiguration(job);
File testFile = new File(((StaticPathSpec) config.getPathSpec()).getPaths());
Path path = new Path(testFile.getAbsoluteFile().toURI());
FileSplit split = new FileSplit(path, 0, testFile.length(), null);
FileSplit split = new FileSplit(path, 0, testFile.length(), new String[]{"host"});
InputFormat inputFormat = ReflectionUtils.newInstance(
InputFormat<NullWritable, OrcStruct> inputFormat = ReflectionUtils.newInstance(
OrcInputFormat.class,
job.getConfiguration()
);
TaskAttemptContext context = new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID());
try (RecordReader reader = inputFormat.createRecordReader(split, context)) {
RecordReader<NullWritable, OrcStruct> reader = inputFormat.getRecordReader(
split,
new JobConf(job.getConfiguration()),
null
);
try {
List<InputRow> records = new ArrayList<>();
InputRowParser parser = config.getParser();
final NullWritable key = reader.createKey();
OrcStruct value = reader.createValue();
reader.initialize(split, context);
while (reader.nextKeyValue()) {
reader.nextKeyValue();
Object data = reader.getCurrentValue();
records.add(((List<InputRow>) parser.parseBatch(data)).get(0));
while (reader.next(key, value)) {
records.add(((List<InputRow>) parser.parseBatch(value)).get(0));
value = reader.createValue();
}
return records;
}
finally {
reader.close();
}
}
}

View File

@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.orc;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.FileEntity;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec;
import org.apache.druid.java.util.common.parsers.JSONPathFieldType;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
public class OrcReaderTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
// This test is migrated from OrcHadoopInputRowParserTest
@Test
public void testTest1() throws IOException
{
final InputEntityReader reader = createReader(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("col1", "col2"))),
new OrcInputFormat(null, null, new Configuration()),
"example/test_1.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2016-01-01T00:00:00.000Z"), row.getTimestamp());
Assert.assertEquals("bar", Iterables.getOnlyElement(row.getDimension("col1")));
Assert.assertEquals(ImmutableList.of("dat1", "dat2", "dat3"), row.getDimension("col2"));
Assert.assertEquals(1.1, row.getMetric("val1").doubleValue(), 0.001);
Assert.assertFalse(iterator.hasNext());
}
}
// This test is migrated from OrcHadoopInputRowParserTest
@Test
public void testTest2() throws IOException
{
final InputFormat inputFormat = new OrcInputFormat(
new JSONPathSpec(
true,
Collections.singletonList(new JSONPathFieldSpec(JSONPathFieldType.PATH, "col7-subcol7", "$.col7.subcol7"))
),
null,
new Configuration()
);
final InputEntityReader reader = createReader(
new TimestampSpec("timestamp", "auto", null),
new DimensionsSpec(null),
inputFormat,
"example/test_2.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
Assert.assertEquals(DateTimes.of("2016-01-01T00:00:00.000Z"), row.getTimestamp());
Assert.assertEquals("bar", Iterables.getOnlyElement(row.getDimension("col1")));
Assert.assertEquals(ImmutableList.of("dat1", "dat2", "dat3"), row.getDimension("col2"));
Assert.assertEquals("1.1", Iterables.getOnlyElement(row.getDimension("col3")));
Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("col4")));
Assert.assertEquals("3.5", Iterables.getOnlyElement(row.getDimension("col5")));
Assert.assertTrue(row.getDimension("col6").isEmpty());
Assert.assertFalse(iterator.hasNext());
}
}
// This test is migrated from OrcHadoopInputRowParserTest
@Test
public void testOrcFile11Format() throws IOException
{
final OrcInputFormat inputFormat = new OrcInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.PATH, "struct_list_struct_int", "$.middle.list[1].int1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "struct_list_struct_intlist", "$.middle.list[*].int1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "list_struct_string", "$.list[0].string1"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "map_struct_int", "$.map.chani.int1")
)
),
null,
new Configuration()
);
final InputEntityReader reader = createReader(
new TimestampSpec("ts", "millis", null),
new DimensionsSpec(null),
inputFormat,
"example/orc-file-11-format.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
int actualRowCount = 0;
// Check the first row
Assert.assertTrue(iterator.hasNext());
InputRow row = iterator.next();
actualRowCount++;
Assert.assertEquals("false", Iterables.getOnlyElement(row.getDimension("boolean1")));
Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("byte1")));
Assert.assertEquals("1024", Iterables.getOnlyElement(row.getDimension("short1")));
Assert.assertEquals("65536", Iterables.getOnlyElement(row.getDimension("int1")));
Assert.assertEquals("9223372036854775807", Iterables.getOnlyElement(row.getDimension("long1")));
Assert.assertEquals("1.0", Iterables.getOnlyElement(row.getDimension("float1")));
Assert.assertEquals("-15.0", Iterables.getOnlyElement(row.getDimension("double1")));
Assert.assertEquals("AAECAwQAAA==", Iterables.getOnlyElement(row.getDimension("bytes1")));
Assert.assertEquals("hi", Iterables.getOnlyElement(row.getDimension("string1")));
Assert.assertEquals("1.23456786547456E7", Iterables.getOnlyElement(row.getDimension("decimal1")));
Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
Assert.assertEquals(ImmutableList.of("1", "2"), row.getDimension("struct_list_struct_intlist"));
Assert.assertEquals("good", Iterables.getOnlyElement(row.getDimension("list_struct_string")));
Assert.assertEquals(DateTimes.of("2000-03-12T15:00:00.0Z"), row.getTimestamp());
while (iterator.hasNext()) {
actualRowCount++;
row = iterator.next();
}
// Check the last row
Assert.assertEquals("true", Iterables.getOnlyElement(row.getDimension("boolean1")));
Assert.assertEquals("100", Iterables.getOnlyElement(row.getDimension("byte1")));
Assert.assertEquals("2048", Iterables.getOnlyElement(row.getDimension("short1")));
Assert.assertEquals("65536", Iterables.getOnlyElement(row.getDimension("int1")));
Assert.assertEquals("9223372036854775807", Iterables.getOnlyElement(row.getDimension("long1")));
Assert.assertEquals("2.0", Iterables.getOnlyElement(row.getDimension("float1")));
Assert.assertEquals("-5.0", Iterables.getOnlyElement(row.getDimension("double1")));
Assert.assertEquals("", Iterables.getOnlyElement(row.getDimension("bytes1")));
Assert.assertEquals("bye", Iterables.getOnlyElement(row.getDimension("string1")));
Assert.assertEquals("1.23456786547457E7", Iterables.getOnlyElement(row.getDimension("decimal1")));
Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("struct_list_struct_int")));
Assert.assertEquals(ImmutableList.of("1", "2"), row.getDimension("struct_list_struct_intlist"));
Assert.assertEquals("cat", Iterables.getOnlyElement(row.getDimension("list_struct_string")));
Assert.assertEquals("5", Iterables.getOnlyElement(row.getDimension("map_struct_int")));
Assert.assertEquals(DateTimes.of("2000-03-12T15:00:01.000Z"), row.getTimestamp());
Assert.assertEquals(7500, actualRowCount);
}
}
// This test is migrated from OrcHadoopInputRowParserTest
@Test
public void testOrcSplitElim() throws IOException
{
final InputEntityReader reader = createReader(
new TimestampSpec("ts", "millis", null),
new DimensionsSpec(null),
new OrcInputFormat(new JSONPathSpec(true, null), null, new Configuration()),
"example/orc_split_elim.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
int actualRowCount = 0;
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
actualRowCount++;
Assert.assertEquals(DateTimes.of("1969-12-31T16:00:00.0Z"), row.getTimestamp());
Assert.assertEquals("2", Iterables.getOnlyElement(row.getDimension("userid")));
Assert.assertEquals("foo", Iterables.getOnlyElement(row.getDimension("string1")));
Assert.assertEquals("0.8", Iterables.getOnlyElement(row.getDimension("subtype")));
Assert.assertEquals("1.2", Iterables.getOnlyElement(row.getDimension("decimal1")));
while (iterator.hasNext()) {
actualRowCount++;
iterator.next();
}
Assert.assertEquals(25000, actualRowCount);
}
}
// This test is migrated from OrcHadoopInputRowParserTest
@Test
public void testDate1900() throws IOException
{
final InputEntityReader reader = createReader(
new TimestampSpec("time", "millis", null),
new DimensionsSpec(null, Collections.singletonList("time"), null),
new OrcInputFormat(new JSONPathSpec(true, null), null, new Configuration()),
"example/TestOrcFile.testDate1900.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
int actualRowCount = 0;
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
actualRowCount++;
Assert.assertEquals(1, row.getDimensions().size());
Assert.assertEquals(DateTimes.of("1900-05-05T12:34:56.1Z"), row.getTimestamp());
Assert.assertEquals("1900-12-25T00:00:00.000Z", Iterables.getOnlyElement(row.getDimension("date")));
while (iterator.hasNext()) {
actualRowCount++;
iterator.next();
}
Assert.assertEquals(70000, actualRowCount);
}
}
// This test is migrated from OrcHadoopInputRowParserTest
@Test
public void testDate2038() throws IOException
{
final InputEntityReader reader = createReader(
new TimestampSpec("time", "millis", null),
new DimensionsSpec(null, Collections.singletonList("time"), null),
new OrcInputFormat(new JSONPathSpec(true, null), null, new Configuration()),
"example/TestOrcFile.testDate2038.orc"
);
try (CloseableIterator<InputRow> iterator = reader.read()) {
int actualRowCount = 0;
Assert.assertTrue(iterator.hasNext());
final InputRow row = iterator.next();
actualRowCount++;
Assert.assertEquals(1, row.getDimensions().size());
Assert.assertEquals(DateTimes.of("2038-05-05T12:34:56.1Z"), row.getTimestamp());
Assert.assertEquals("2038-12-25T00:00:00.000Z", Iterables.getOnlyElement(row.getDimension("date")));
while (iterator.hasNext()) {
actualRowCount++;
iterator.next();
}
Assert.assertEquals(212000, actualRowCount);
}
}
private InputEntityReader createReader(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
InputFormat inputFormat,
String dataFile
) throws IOException
{
final InputRowSchema schema = new InputRowSchema(timestampSpec, dimensionsSpec, Collections.emptyList());
final FileEntity entity = new FileEntity(new File(dataFile));
return inputFormat.createReader(schema, entity, temporaryFolder.newFolder());
}
}

View File

@ -288,72 +288,72 @@
<artifactId>jersey-json</artifactId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-sslengine</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jets3t</artifactId>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<artifactId>jetty</artifactId>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</exclusion>
<exclusion>
<artifactId>xmlenc</artifactId>
<groupId>xmlenc</groupId>
<artifactId>xmlenc</artifactId>
</exclusion>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<artifactId>jsch</artifactId>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
</exclusion>
<exclusion>
<artifactId>protobuf-java</artifactId>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-collections</artifactId>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-logging</artifactId>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-cli</artifactId>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-digester</artifactId>
<groupId>commons-digester</groupId>
<artifactId>commons-digester</artifactId>
</exclusion>
<exclusion>
<artifactId>commons-beanutils-core</artifactId>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
<exclusion>
<artifactId>apacheds-kerberos-codec</artifactId>
<groupId>org.apache.directory.server</groupId>
<artifactId>apacheds-kerberos-codec</artifactId>
</exclusion>
<exclusion>
<artifactId>nimbus-jose-jwt</artifactId>
<groupId>com.nimbusds</groupId>
<artifactId>nimbus-jose-jwt</artifactId>
</exclusion>
</exclusions>
</dependency>

View File

@ -62,6 +62,7 @@ import org.apache.druid.segment.loading.NoopDataSegmentKiller;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.junit.After;
import org.junit.Before;
@ -78,7 +79,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
public abstract class IngestionTestBase
public abstract class IngestionTestBase extends InitializedNullHandlingTest
{
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();