Calculate max split size based on numMapTask in DatasourceInputFormat (#2882)

* Calculate max split size based on numMapTask

* updated docs & fixed possible ArithmeticException
This commit is contained in:
Navis Ryu 2016-07-21 08:53:51 +09:00 committed by Fangjin Yang
parent fd798d32bc
commit cd7337fc8a
3 changed files with 41 additions and 4 deletions

View File

@ -43,7 +43,7 @@ This is a type of `inputSpec` that reads data already stored inside Druid.
|-----|----|-----------|--------|
|type|String.|This should always be 'dataSource'.|yes|
|ingestionSpec|JSON object.|Specification of Druid segments to be loaded. See below.|yes|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. With -1, druid calculates max split size based on user specified number of map task(mapred.map.tasks or mapreduce.job.maps). By default, one split is made for one segment. |no|
Here is what goes inside `ingestionSpec`:

View File

@ -83,6 +83,16 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
logger.info("segments to read [%s]", segmentsStr);
long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0);
if (maxSize < 0) {
long totalSize = 0;
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
}
int mapTask = ((JobConf)conf).getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;
}
}
if (maxSize > 0) {
//combining is to happen, let us sort the segments list by size so that they

View File

@ -28,7 +28,6 @@ import io.druid.indexer.JobHelper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -55,7 +54,7 @@ public class DatasourceInputFormatTest
{
private List<WindowedDataSegment> segments;
private List<LocatedFileStatus> locations;
private Configuration config;
private JobConf config;
private JobContext context;
@Before
@ -142,7 +141,7 @@ public class DatasourceInputFormatTest
)
);
config = new Configuration();
config = new JobConf();
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
new DefaultObjectMapper().writeValueAsString(segments)
@ -238,6 +237,34 @@ public class DatasourceInputFormatTest
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations());
}
@Test
public void testGetSplitsCombineCalculated() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "-1");
config.setNumMapTasks(3);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(3, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments.get(0)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations());
Assert.assertEquals(
Sets.newHashSet(segments.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations());
Assert.assertEquals(
Sets.newHashSet(segments.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(2)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(2).getLocations());
}
@Test
public void testGetRecordReader() throws Exception
{