diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index be7f3c88340..b8c798a77b5 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -142,8 +143,9 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn } } + @VisibleForTesting @JsonProperty(PROP_PATHS) - private List getInputPaths() + List getInputPaths() { return inputPaths; } @@ -199,7 +201,8 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn @Override public SplittableInputSource> withSplit(InputSplit> split) { - return new HdfsInputSource(split.get().toString(), configuration); + List paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList()); + return new HdfsInputSource(paths, configuration); } @Override diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index d2c7820d5c1..044930f838b 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Iterables; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; @@ -277,6 +278,21 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L)); Assert.assertEquals(NUM_FILE, numSplits); } + + @Test + public void createCorrectInputSourceWithSplit() throws Exception + { + // Set maxSplitSize to 1 so that each inputSplit has only one object + List>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L)) + .collect(Collectors.toList()); + + for (InputSplit> split : splits) { + String expectedPath = Iterables.getOnlyElement(split.get()).toString(); + HdfsInputSource inputSource = (HdfsInputSource) target.withSplit(split); + String actualPath = Iterables.getOnlyElement(inputSource.getInputPaths()); + Assert.assertEquals(expectedPath, actualPath); + } + } } public static class EmptyPathsTest