From 4ae6466ae2cd4a8f547f2bdf5cdbbd3c811d9deb Mon Sep 17 00:00:00 2001 From: Chi Cao Minh Date: Tue, 19 Nov 2019 22:19:39 -0800 Subject: [PATCH] HDFS input source (#8899) * HDFS input source Add support for using HDFS as an input source. In this version, commas or globs are not supported in HDFS paths. * Fix forbidden api * Address review comments --- .../druid/data/input/impl/CsvInputFormat.java | 2 +- extensions-core/hdfs-storage/pom.xml | 4 - .../firehose/hdfs/HdfsFirehoseFactory.java | 36 +-- .../inputsource/hdfs/HdfsInputEntity.java | 63 ++++ .../inputsource/hdfs/HdfsInputSource.java | 233 ++++++++++++++ .../storage/hdfs/HdfsStorageDruidModule.java | 6 +- .../inputsource/hdfs/HdfsInputSourceTest.java | 302 ++++++++++++++++++ 7 files changed, 606 insertions(+), 40 deletions(-) create mode 100644 extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java create mode 100644 extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java create mode 100644 extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java index c05f47f594e..cf2a301a69e 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CsvInputFormat.java @@ -47,7 +47,7 @@ public class CsvInputFormat implements InputFormat @JsonCreator public CsvInputFormat( @JsonProperty("columns") @Nullable List columns, - @JsonProperty("listDelimiter") String listDelimiter, + @JsonProperty("listDelimiter") @Nullable String listDelimiter, @Deprecated @JsonProperty("hasHeaderRow") @Nullable Boolean hasHeaderRow, @JsonProperty("findColumnsFromHeader") @Nullable Boolean findColumnsFromHeader, @JsonProperty("skipHeaderRows") int skipHeaderRows diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index 1231840f517..977d9b446b0 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -201,10 +201,6 @@ org.mortbay.jetty jetty-util - - org.apache.hadoop - hadoop-annotations - com.google.protobuf protobuf-java diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java index 8fe0eca1fad..962da5e5d31 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/firehose/hdfs/HdfsFirehoseFactory.java @@ -27,23 +27,17 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import java.io.IOException; import java.io.InputStream; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.stream.Collectors; public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { @@ -62,16 +56,8 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory - if (inputPaths instanceof String) { - this.inputPaths = Collections.singletonList((String) inputPaths); - } else if (inputPaths instanceof List && ((List) inputPaths).stream().allMatch(x -> x instanceof String)) { - this.inputPaths = ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); - } else { - throw new IAE("'inputPaths' must be a string or an array of strings"); - } } @JsonProperty("paths") @@ -83,23 +69,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory initObjects() throws IOException { - // Use TextInputFormat to read splits. To do this, we need to make a fake Job. - final Job job = Job.getInstance(conf); - - // Add paths to the fake JobContext. - inputPaths.forEach(input -> { - try { - FileInputFormat.addInputPaths(job, input); - } - catch (IOException e) { - throw new RuntimeException(e); - } - }); - - return new TextInputFormat().getSplits(job) - .stream() - .map(split -> ((FileSplit) split).getPath()) - .collect(Collectors.toSet()); + return HdfsInputSource.getPaths(inputPaths, conf); } @Override diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java new file mode 100644 index 00000000000..039ca23f8cf --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.google.common.base.Predicate; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; +import org.apache.druid.utils.CompressionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +public class HdfsInputEntity implements InputEntity +{ + private final Configuration conf; + private final Path path; + + HdfsInputEntity(Configuration conf, Path path) + { + this.conf = conf; + this.path = path; + } + + @Override + public URI getUri() + { + return path.toUri(); + } + + @Override + public InputStream open() throws IOException + { + FileSystem fs = path.getFileSystem(conf); + return CompressionUtils.decompress(fs.open(path), path.getName()); + } + + @Override + public Predicate getFetchRetryCondition() + { + return HdfsDataSegmentPuller.RETRY_PREDICATE; + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java new file mode 100644 index 00000000000..007e7e0d3af --- /dev/null +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.IAE; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class HdfsInputSource extends AbstractInputSource implements SplittableInputSource +{ + private static final String PROP_PATHS = "paths"; + + private final List inputPaths; + private final Configuration configuration; + + // Although the javadocs for SplittableInputSource say to avoid caching splits to reduce memory, HdfsInputSource + // *does* cache the splits for the following reasons: + // + // 1) It will improve compatibility with the index_hadoop task, allowing people to easily migrate from Hadoop. + // For example, input paths with globs will be supported (lazily expanding the wildcard glob is tricky). + // + // 2) The index_hadoop task allocates splits eagerly, so the memory usage should not be a problem for anyone + // migrating from Hadoop. + private List cachedPaths; + + @JsonCreator + public HdfsInputSource( + @JsonProperty(PROP_PATHS) Object inputPaths, + @JacksonInject Configuration configuration + ) + { + this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS); + this.configuration = configuration; + this.cachedPaths = null; + } + + public static List coerceInputPathsToList(Object inputPaths, String propertyName) + { + final List paths; + + if (inputPaths instanceof String) { + paths = Collections.singletonList((String) inputPaths); + } else if (inputPaths instanceof List && ((List) inputPaths).stream().allMatch(x -> x instanceof String)) { + paths = ((List) inputPaths).stream().map(x -> (String) x).collect(Collectors.toList()); + } else { + throw new IAE("'%s' must be a string or an array of strings", propertyName); + } + + return paths; + } + + public static Collection getPaths(List inputPaths, Configuration configuration) throws IOException + { + if (inputPaths.isEmpty()) { + return Collections.emptySet(); + } + + // Use FileInputFormat to read splits. To do this, we need to make a fake Job. + Job job = Job.getInstance(configuration); + + // Add paths to the fake JobContext. + for (String inputPath : inputPaths) { + FileInputFormat.addInputPaths(job, inputPath); + } + + return new HdfsFileInputFormat().getSplits(job) + .stream() + .map(split -> ((FileSplit) split).getPath()) + .collect(Collectors.toSet()); + } + + /** + * Helper for leveraging hadoop code to interpret HDFS paths with globs + */ + private static class HdfsFileInputFormat extends FileInputFormat + { + @Override + public RecordReader createRecordReader( + org.apache.hadoop.mapreduce.InputSplit inputSplit, + TaskAttemptContext taskAttemptContext + ) + { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean isSplitable(JobContext context, Path filename) + { + return false; // prevent generating extra paths + } + } + + @JsonProperty(PROP_PATHS) + private List getInputPaths() + { + return inputPaths; + } + + @Override + protected InputSourceReader formattableReader( + InputRowSchema inputRowSchema, + InputFormat inputFormat, + @Nullable File temporaryDirectory + ) + { + final Stream> splits; + try { + splits = createSplits(inputFormat, null); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + splits.map(split -> new HdfsInputEntity(configuration, split.get())), + temporaryDirectory + ); + } + + @Override + public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + throws IOException + { + cachePathsIfNeeded(); + return cachedPaths.stream().map(InputSplit::new); + } + + @Override + public int getNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException + { + cachePathsIfNeeded(); + return cachedPaths.size(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new HdfsInputSource(split.get().toString(), configuration); + } + + @Override + public boolean needsFormat() + { + return true; + } + + private void cachePathsIfNeeded() throws IOException + { + if (cachedPaths == null) { + cachedPaths = ImmutableList.copyOf(Preconditions.checkNotNull(getPaths(inputPaths, configuration), "paths")); + } + } + + static Builder builder() + { + return new Builder(); + } + + static final class Builder + { + private Object paths; + private Configuration configuration; + + private Builder() + { + } + + Builder paths(Object paths) + { + this.paths = paths; + return this; + } + + Builder configuration(Configuration configuration) + { + this.configuration = configuration; + return this; + } + + HdfsInputSource build() + { + return new HdfsInputSource(paths, configuration); + } + } +} diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java index 5f5a1d0cf44..6e88f8ee38b 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/HdfsStorageDruidModule.java @@ -33,6 +33,7 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LifecycleModule; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.initialization.DruidModule; +import org.apache.druid.inputsource.hdfs.HdfsInputSource; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogs; import org.apache.druid.storage.hdfs.tasklog.HdfsTaskLogsConfig; import org.apache.hadoop.conf.Configuration; @@ -48,7 +49,7 @@ import java.util.Properties; */ public class HdfsStorageDruidModule implements DruidModule { - public static final String SCHEME = "hdfs"; + static final String SCHEME = "hdfs"; private Properties props = null; @Inject @@ -63,7 +64,8 @@ public class HdfsStorageDruidModule implements DruidModule return Collections.singletonList( new SimpleModule().registerSubtypes( new NamedType(HdfsLoadSpec.class, HdfsStorageDruidModule.SCHEME), - new NamedType(HdfsFirehoseFactory.class, HdfsStorageDruidModule.SCHEME) + new NamedType(HdfsFirehoseFactory.class, HdfsStorageDruidModule.SCHEME), + new NamedType(HdfsInputSource.class, HdfsStorageDruidModule.SCHEME) ) ); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java new file mode 100644 index 00000000000..b0cfa20450f --- /dev/null +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.inputsource.hdfs; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.storage.hdfs.HdfsStorageDruidModule; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.runners.Enclosed; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.io.Writer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +@RunWith(Enclosed.class) +public class HdfsInputSourceTest +{ + private static final String PATH = "/foo/bar"; + private static final Configuration CONFIGURATION = new Configuration(); + private static final String COLUMN = "value"; + private static final InputRowSchema INPUT_ROW_SCHEMA = new InputRowSchema( + new TimestampSpec(null, null, null), + DimensionsSpec.EMPTY, + Collections.emptyList() + ); + private static final InputFormat INPUT_FORMAT = new CsvInputFormat( + Arrays.asList(TimestampSpec.DEFAULT_COLUMN, COLUMN), + null, + false, + null, + 0 + ); + + public static class SerializeDeserializeTest + { + private static final ObjectMapper OBJECT_MAPPER = createObjectMapper(); + + private HdfsInputSource.Builder hdfsInputSourceBuilder; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Before + public void setup() + { + hdfsInputSourceBuilder = HdfsInputSource.builder() + .paths(PATH) + .configuration(CONFIGURATION); + } + + @Test + public void requiresPathsAsStringOrArrayOfStrings() + { + exception.expect(IllegalArgumentException.class); + exception.expectMessage("'paths' must be a string or an array of strings"); + + hdfsInputSourceBuilder.paths(Arrays.asList("a", 1)).build(); + } + + @Test + public void serializesDeserializesWithArrayPaths() + { + Wrapper target = new Wrapper(hdfsInputSourceBuilder.paths(Collections.singletonList(PATH))); + testSerializesDeserializes(target); + } + + @Test + public void serializesDeserializesStringPaths() + { + Wrapper target = new Wrapper(hdfsInputSourceBuilder.paths(PATH)); + testSerializesDeserializes(target); + } + + private static void testSerializesDeserializes(Wrapper hdfsInputSourceWrapper) + { + try { + String serialized = OBJECT_MAPPER.writeValueAsString(hdfsInputSourceWrapper); + Wrapper deserialized = OBJECT_MAPPER.readValue(serialized, Wrapper.class); + Assert.assertEquals(serialized, OBJECT_MAPPER.writeValueAsString(deserialized)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private static ObjectMapper createObjectMapper() + { + final ObjectMapper mapper = new ObjectMapper(); + mapper.setInjectableValues(new InjectableValues.Std().addValue(Configuration.class, new Configuration())); + new HdfsStorageDruidModule().getJacksonModules().forEach(mapper::registerModule); + return mapper; + } + + // Helper to test HdfsInputSource is added correctly to HdfsStorageDruidModule + private static class Wrapper + { + @JsonProperty + InputSource inputSource; + + @SuppressWarnings("unused") // used by Jackson + private Wrapper() + { + } + + Wrapper(HdfsInputSource.Builder hdfsInputSourceBuilder) + { + this.inputSource = hdfsInputSourceBuilder.build(); + } + } + } + + public static class ReaderTest + { + private static final String PATH = "/test"; + private static final int NUM_FILE = 3; + private static final String KEY_VALUE_SEPARATOR = ","; + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private MiniDFSCluster dfsCluster; + private HdfsInputSource target; + private Set paths; + private Map timestampToValue; + + @Before + public void setup() throws IOException + { + timestampToValue = new HashMap<>(); + + File dir = temporaryFolder.getRoot(); + Configuration configuration = new Configuration(true); + configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, dir.getAbsolutePath()); + dfsCluster = new MiniDFSCluster.Builder(configuration).build(); + + paths = IntStream.range(0, NUM_FILE) + .mapToObj( + i -> { + char value = ALPHABET.charAt(i % ALPHABET.length()); + timestampToValue.put((long) i, Character.toString(value)); + return createFile( + dfsCluster, + String.valueOf(i), + i + KEY_VALUE_SEPARATOR + value + ); + } + ) + .collect(Collectors.toSet()); + + target = HdfsInputSource.builder() + .paths(dfsCluster.getURI() + PATH + "*") + .configuration(CONFIGURATION) + .build(); + } + + @After + public void teardown() + { + if (dfsCluster != null) { + dfsCluster.shutdown(true); + } + } + + private static Path createFile(MiniDFSCluster dfsCluster, String pathSuffix, String contents) + { + try { + Path path = new Path(PATH + pathSuffix); + try (Writer writer = new BufferedWriter( + new OutputStreamWriter(dfsCluster.getFileSystem().create(path), StandardCharsets.UTF_8) + )) { + writer.write(contents); + } + return path; + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Test + public void readsSplitsCorrectly() throws IOException + { + InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null); + + Map actualTimestampToValue = new HashMap<>(); + try (CloseableIterator iterator = reader.read()) { + while (iterator.hasNext()) { + InputRow row = iterator.next(); + actualTimestampToValue.put(row.getTimestampFromEpoch(), row.getDimension(COLUMN).get(0)); + } + } + + Assert.assertEquals(timestampToValue, actualTimestampToValue); + } + + @Test + public void hasCorrectSplits() throws IOException + { + Set actualPaths = target.createSplits(null, null) + .map(split -> Path.getPathWithoutSchemeAndAuthority(split.get())) + .collect(Collectors.toSet()); + Assert.assertEquals(paths, actualPaths); + } + + @Test + public void hasCorrectNumberOfSplits() throws IOException + { + int numSplits = target.getNumSplits(null, null); + Assert.assertEquals(NUM_FILE, numSplits); + } + } + + public static class EmptyPathsTest + { + private HdfsInputSource target; + + @Before + public void setup() + { + target = HdfsInputSource.builder() + .paths(Collections.emptyList()) + .configuration(CONFIGURATION) + .build(); + } + + @Test + public void readsSplitsCorrectly() throws IOException + { + InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null); + + try (CloseableIterator iterator = reader.read()) { + Assert.assertFalse(iterator.hasNext()); + } + } + + @Test + public void hasCorrectSplits() throws IOException + { + List paths = target.createSplits(null, null) + .map(split -> Path.getPathWithoutSchemeAndAuthority(split.get())) + .collect(Collectors.toList()); + Assert.assertTrue(String.valueOf(paths), paths.isEmpty()); + } + + @Test + public void hasCorrectNumberOfSplits() throws IOException + { + int numSplits = target.getNumSplits(null, null); + Assert.assertEquals(0, numSplits); + } + } +}