pass configuration from context into JobConf for determining DatasourceInputFormat splits (#5408)

This commit is contained in:
David Lim 2018-02-21 11:01:32 -07:00 committed by Fangjin Yang
parent f3796bc81b
commit 5c56d01daa
1 changed files with 6 additions and 8 deletions

View File

@ -29,7 +29,6 @@ import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -65,7 +64,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
{
Configuration conf = context.getConfiguration();
JobConf conf = new JobConf(context.getConfiguration());
String segmentsStr = Preconditions.checkNotNull(
conf.get(CONF_INPUT_SEGMENTS),
@ -92,7 +91,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
}
int mapTask = ((JobConf) conf).getNumMapTasks();
int mapTask = conf.getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;
}
@ -119,11 +118,10 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;
JobConf dummyConf = new JobConf();
org.apache.hadoop.mapred.InputFormat fio = supplier.get();
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(toDataSourceSplit(list, fio, dummyConf));
splits.add(toDataSourceSplit(list, fio, conf));
list = Lists.newArrayList();
size = 0;
}
@ -133,7 +131,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
}
if (list.size() > 0) {
splits.add(toDataSourceSplit(list, fio, dummyConf));
splits.add(toDataSourceSplit(list, fio, conf));
}
logger.info("Number of splits [%d]", splits.size());
@ -217,14 +215,14 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
try {
return Arrays.stream(split.getLocations());
}
catch (final IOException e) {
catch (final Exception e) {
logger.error(e, "Exception getting locations");
return Stream.empty();
}
}
);
}
catch (final IOException e) {
catch (final Exception e) {
logger.error(e, "Exception getting splits");
return Stream.empty();
}