HDFS input source (#8899)

* HDFS input source

Add support for using HDFS as an input source. In this version, commas
or globs are not supported in HDFS paths.

* Fix forbidden api

* Address review comments
This commit is contained in:
Chi Cao Minh 2019-11-19 22:19:39 -08:00 committed by Jihoon Son
parent 074a45219d
commit 4ae6466ae2
7 changed files with 606 additions and 40 deletions

View File

@ -47,7 +47,7 @@ public class CsvInputFormat implements InputFormat
@JsonCreator
public CsvInputFormat(
@JsonProperty("columns") @Nullable List<String> columns,
@JsonProperty("listDelimiter") String listDelimiter,
@JsonProperty("listDelimiter") @Nullable String listDelimiter,
@Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow,
@JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader,
@JsonProperty("skipHeaderRows") int skipHeaderRows

View File

@ -201,10 +201,6 @@
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>

View File

@ -27,23 +27,17 @@ import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Path>
{
@ -62,16 +56,8 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
)
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.inputPaths = HdfsInputSource.coerceInputPathsToList(inputPaths, "inputPaths");
this.conf = conf;
// Coerce 'inputPaths' to List<String>
if (inputPaths instanceof String) {
this.inputPaths = Collections.singletonList((String) inputPaths);
} else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) {
this.inputPaths = ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
} else {
throw new IAE("'inputPaths' must be a string or an array of strings");
}
}
@JsonProperty("paths")
@ -83,23 +69,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
@Override
protected Collection<Path> initObjects() throws IOException
{
// Use TextInputFormat to read splits. To do this, we need to make a fake Job.
final Job job = Job.getInstance(conf);
// Add paths to the fake JobContext.
inputPaths.forEach(input -> {
try {
FileInputFormat.addInputPaths(job, input);
}
catch (IOException e) {
throw new RuntimeException(e);
}
});
return new TextInputFormat().getSplits(job)
.stream()
.map(split -> ((FileSplit) split).getPath())
.collect(Collectors.toSet());
return HdfsInputSource.getPaths(inputPaths, conf);
}
@Override

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
import org.apache.druid.utils.CompressionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
public class HdfsInputEntity implements InputEntity
{
private final Configuration conf;
private final Path path;
HdfsInputEntity(Configuration conf, Path path)
{
this.conf = conf;
this.path = path;
}
@Override
public URI getUri()
{
return path.toUri();
}
@Override
public InputStream open() throws IOException
{
FileSystem fs = path.getFileSystem(conf);
return CompressionUtils.decompress(fs.open(path), path.getName());
}
@Override
public Predicate<Throwable> getFetchRetryCondition()
{
return HdfsDataSegmentPuller.RETRY_PREDICATE;
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.IAE;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class HdfsInputSource extends AbstractInputSource implements SplittableInputSource<Path>
{
private static final String PROP_PATHS = "paths";
private final List<String> inputPaths;
private final Configuration configuration;
// Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource
// *does* cache the splits for the following reasons:
//
// 1) It will improve compatibility with the index_hadoop task, allowing people to easily migrate from Hadoop.
// For example, input paths with globs will be supported (lazily expanding the wildcard glob is tricky).
//
// 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone
// migrating from Hadoop.
private List<Path> cachedPaths;
@JsonCreator
public HdfsInputSource(
@JsonProperty(PROP_PATHS) Object inputPaths,
@JacksonInject Configuration configuration
)
{
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
this.configuration = configuration;
this.cachedPaths = null;
}
public static List<String> coerceInputPathsToList(Object inputPaths, String propertyName)
{
final List<String> paths;
if (inputPaths instanceof String) {
paths = Collections.singletonList((String) inputPaths);
} else if (inputPaths instanceof List && ((List<?>) inputPaths).stream().allMatch(x -> x instanceof String)) {
paths = ((List<?>) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList());
} else {
throw new IAE("'%s' must be a string or an array of strings", propertyName);
}
return paths;
}
public static Collection<Path> getPaths(List<String> inputPaths, Configuration configuration) throws IOException
{
if (inputPaths.isEmpty()) {
return Collections.emptySet();
}
// Use FileInputFormat to read splits. To do this, we need to make a fake Job.
Job job = Job.getInstance(configuration);
// Add paths to the fake JobContext.
for (String inputPath : inputPaths) {
FileInputFormat.addInputPaths(job, inputPath);
}
return new HdfsFileInputFormat().getSplits(job)
.stream()
.map(split -> ((FileSplit) split).getPath())
.collect(Collectors.toSet());
}
/**
* Helper for leveraging hadoop code to interpret HDFS paths with globs
*/
private static class HdfsFileInputFormat extends FileInputFormat<Object, Object>
{
@Override
public RecordReader<Object, Object> createRecordReader(
org.apache.hadoop.mapreduce.InputSplit inputSplit,
TaskAttemptContext taskAttemptContext
)
{
throw new UnsupportedOperationException();
}
@Override
protected boolean isSplitable(JobContext context, Path filename)
{
return false; // prevent generating extra paths
}
}
@JsonProperty(PROP_PATHS)
private List<String> getInputPaths()
{
return inputPaths;
}
@Override
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
)
{
final Stream<InputSplit<Path>> splits;
try {
splits = createSplits(inputFormat, null);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
splits.map(split -> new HdfsInputEntity(configuration, split.get())),
temporaryDirectory
);
}
@Override
public Stream<InputSplit<Path>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
throws IOException
{
cachePathsIfNeeded();
return cachedPaths.stream().map(InputSplit::new);
}
@Override
public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException
{
cachePathsIfNeeded();
return cachedPaths.size();
}
@Override
public SplittableInputSource<Path> withSplit(InputSplit<Path> split)
{
return new HdfsInputSource(split.get().toString(), configuration);
}
@Override
public boolean needsFormat()
{
return true;
}
private void cachePathsIfNeeded() throws IOException
{
if (cachedPaths == null) {
cachedPaths = ImmutableList.copyOf(Preconditions.checkNotNull(getPaths(inputPaths, configuration), "paths"));
}
}
static Builder builder()
{
return new Builder();
}
static final class Builder
{
private Object paths;
private Configuration configuration;
private Builder()
{
}
Builder paths(Object paths)
{
this.paths = paths;
return this;
}
Builder configuration(Configuration configuration)
{
this.configuration = configuration;
return this;
}
HdfsInputSource build()
{
return new HdfsInputSource(paths, configuration);
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs;
import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig;
import org.apache.hadoop.conf.Configuration;
@ -48,7 +49,7 @@ import java.util.Properties;
*/
public class HdfsStorageDruidModule implements DruidModule
{
public static final String SCHEME = "hdfs";
static final String SCHEME = "hdfs";
private Properties props = null;
@Inject
@ -63,7 +64,8 @@ public class HdfsStorageDruidModule implements DruidModule
return Collections.singletonList(
new SimpleModule().registerSubtypes(
new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsFirehoseFactory.class, HdfsStorageDruidModule.SCHEME)
new NamedType(HdfsFirehoseFactory.class, HdfsStorageDruidModule.SCHEME),
new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME)
)
);
}

View File

@ -0,0 +1,302 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.storage.hdfs.HdfsStorageDruidModule;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@RunWith(Enclosed.class)
public class HdfsInputSourceTest
{
private static final String PATH = "/foo/bar";
private static final Configuration CONFIGURATION = new Configuration();
private static final String COLUMN = "value";
private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema(
new TimestampSpec(null, null, null),
DimensionsSpec.EMPTY,
Collections.emptyList()
);
private static final InputFormat INPUT_FORMAT = new CsvInputFormat(
Arrays.asList(TimestampSpec.DEFAULT_COLUMN, COLUMN),
null,
false,
null,
0
);
public static class SerializeDeserializeTest
{
private static final ObjectMapper OBJECT_MAPPER = createObjectMapper();
private HdfsInputSource.Builder hdfsInputSourceBuilder;
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void setup()
{
hdfsInputSourceBuilder = HdfsInputSource.builder()
.paths(PATH)
.configuration(CONFIGURATION);
}
@Test
public void requiresPathsAsStringOrArrayOfStrings()
{
exception.expect(IllegalArgumentException.class);
exception.expectMessage("'paths' must be a string or an array of strings");
hdfsInputSourceBuilder.paths(Arrays.asList("a", 1)).build();
}
@Test
public void serializesDeserializesWithArrayPaths()
{
Wrapper target = new Wrapper(hdfsInputSourceBuilder.paths(Collections.singletonList(PATH)));
testSerializesDeserializes(target);
}
@Test
public void serializesDeserializesStringPaths()
{
Wrapper target = new Wrapper(hdfsInputSourceBuilder.paths(PATH));
testSerializesDeserializes(target);
}
private static void testSerializesDeserializes(Wrapper hdfsInputSourceWrapper)
{
try {
String serialized = OBJECT_MAPPER.writeValueAsString(hdfsInputSourceWrapper);
Wrapper deserialized = OBJECT_MAPPER.readValue(serialized, Wrapper.class);
Assert.assertEquals(serialized, OBJECT_MAPPER.writeValueAsString(deserialized));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static ObjectMapper createObjectMapper()
{
final ObjectMapper mapper = new ObjectMapper();
mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration()));
new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule);
return mapper;
}
// Helper to test HdfsInputSource is added correctly to HdfsStorageDruidModule
private static class Wrapper
{
@JsonProperty
InputSource inputSource;
@SuppressWarnings("unused") // used by Jackson
private Wrapper()
{
}
Wrapper(HdfsInputSource.Builder hdfsInputSourceBuilder)
{
this.inputSource = hdfsInputSourceBuilder.build();
}
}
}
public static class ReaderTest
{
private static final String PATH = "/test";
private static final int NUM_FILE = 3;
private static final String KEY_VALUE_SEPARATOR = ",";
private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
private MiniDFSCluster dfsCluster;
private HdfsInputSource target;
private Set<Path> paths;
private Map<Long, String> timestampToValue;
@Before
public void setup() throws IOException
{
timestampToValue = new HashMap<>();
File dir = temporaryFolder.getRoot();
Configuration configuration = new Configuration(true);
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.getAbsolutePath());
dfsCluster = new MiniDFSCluster.Builder(configuration).build();
paths = IntStream.range(0, NUM_FILE)
.mapToObj(
i -> {
char value = ALPHABET.charAt(i % ALPHABET.length());
timestampToValue.put((long) i, Character.toString(value));
return createFile(
dfsCluster,
String.valueOf(i),
i + KEY_VALUE_SEPARATOR + value
);
}
)
.collect(Collectors.toSet());
target = HdfsInputSource.builder()
.paths(dfsCluster.getURI() + PATH + "*")
.configuration(CONFIGURATION)
.build();
}
@After
public void teardown()
{
if (dfsCluster != null) {
dfsCluster.shutdown(true);
}
}
private static Path createFile(MiniDFSCluster dfsCluster, String pathSuffix, String contents)
{
try {
Path path = new Path(PATH + pathSuffix);
try (Writer writer = new BufferedWriter(
new OutputStreamWriter(dfsCluster.getFileSystem().create(path), StandardCharsets.UTF_8)
)) {
writer.write(contents);
}
return path;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Test
public void readsSplitsCorrectly() throws IOException
{
InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null);
Map<Long, String> actualTimestampToValue = new HashMap<>();
try (CloseableIterator<InputRow> iterator = reader.read()) {
while (iterator.hasNext()) {
InputRow row = iterator.next();
actualTimestampToValue.put(row.getTimestampFromEpoch(), row.getDimension(COLUMN).get(0));
}
}
Assert.assertEquals(timestampToValue, actualTimestampToValue);
}
@Test
public void hasCorrectSplits() throws IOException
{
Set<Path> actualPaths = target.createSplits(null, null)
.map(split -> Path.getPathWithoutSchemeAndAuthority(split.get()))
.collect(Collectors.toSet());
Assert.assertEquals(paths, actualPaths);
}
@Test
public void hasCorrectNumberOfSplits() throws IOException
{
int numSplits = target.getNumSplits(null, null);
Assert.assertEquals(NUM_FILE, numSplits);
}
}
public static class EmptyPathsTest
{
private HdfsInputSource target;
@Before
public void setup()
{
target = HdfsInputSource.builder()
.paths(Collections.emptyList())
.configuration(CONFIGURATION)
.build();
}
@Test
public void readsSplitsCorrectly() throws IOException
{
InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null);
try (CloseableIterator<InputRow> iterator = reader.read()) {
Assert.assertFalse(iterator.hasNext());
}
}
@Test
public void hasCorrectSplits() throws IOException
{
List<Path> paths = target.createSplits(null, null)
.map(split -> Path.getPathWithoutSchemeAndAuthority(split.get()))
.collect(Collectors.toList());
Assert.assertTrue(String.valueOf(paths), paths.isEmpty());
}
@Test
public void hasCorrectNumberOfSplits() throws IOException
{
int numSplits = target.getNumSplits(null, null);
Assert.assertEquals(0, numSplits);
}
}
}