Fix HDFS input source split (#9574)

Fixes an issue where splitting an HDFS input source for use in native
parallel batch ingestion would cause the subtasks to get a split with an
invalid HDFS path.
This commit is contained in:
Chi Cao Minh 2020-03-28 15:45:57 -07:00 committed by GitHub
parent 9081b5f25c
commit c0195a19e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 21 additions and 2 deletions

View File

@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
@ -142,8 +143,9 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
} }
} }
@VisibleForTesting
@JsonProperty(PROP_PATHS) @JsonProperty(PROP_PATHS)
private List<String> getInputPaths() List<String> getInputPaths()
{ {
return inputPaths; return inputPaths;
} }
@ -199,7 +201,8 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
@Override @Override
public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split) public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split)
{ {
return new HdfsInputSource(split.get().toString(), configuration); List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList());
return new HdfsInputSource(paths, configuration);
} }
@Override @Override

View File

@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputRowSchema;
@ -277,6 +278,21 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L)); int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L));
Assert.assertEquals(NUM_FILE, numSplits); Assert.assertEquals(NUM_FILE, numSplits);
} }
@Test
public void createCorrectInputSourceWithSplit() throws Exception
{
// Set maxSplitSize to 1 so that each inputSplit has only one object
List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L))
.collect(Collectors.toList());
for (InputSplit<List<Path>> split : splits) {
String expectedPath = Iterables.getOnlyElement(split.get()).toString();
HdfsInputSource inputSource = (HdfsInputSource) target.withSplit(split);
String actualPath = Iterables.getOnlyElement(inputSource.getInputPaths());
Assert.assertEquals(expectedPath, actualPath);
}
}
} }
public static class EmptyPathsTest public static class EmptyPathsTest