diff --git a/docs/content/ingestion/update-existing-data.md b/docs/content/ingestion/update-existing-data.md index c709ff504a7..92da8a0ae49 100644 --- a/docs/content/ingestion/update-existing-data.md +++ b/docs/content/ingestion/update-existing-data.md @@ -43,7 +43,7 @@ This is a type of `inputSpec` that reads data already stored inside Druid. |-----|----|-----------|--------| |type|String.|This should always be 'dataSource'.|yes| |ingestionSpec|JSON object.|Specification of Druid segments to be loaded. See below.|yes| -|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no| +|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. With -1, druid calculates max split size based on user specified number of map task(mapred.map.tasks or mapreduce.job.maps). By default, one split is made for one segment. |no| Here is what goes inside `ingestionSpec`: diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java index 09d20c6524b..5e158178a8c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -83,6 +83,16 @@ public class DatasourceInputFormat extends InputFormat logger.info("segments to read [%s]", segmentsStr); long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0); + if (maxSize < 0) { + long totalSize = 0; + for (WindowedDataSegment segment : segments) { + totalSize += segment.getSegment().getSize(); + } + int mapTask = ((JobConf)conf).getNumMapTasks(); + if (mapTask > 0) { + maxSize = totalSize / mapTask; + } + } if (maxSize > 0) { //combining is to happen, let us sort the segments list by size so that they diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java index 683183bf879..5e685066cd3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -28,7 +28,6 @@ import io.druid.indexer.JobHelper; import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -55,7 +54,7 @@ public class DatasourceInputFormatTest { private List segments; private List locations; - private Configuration config; + private JobConf config; private JobContext context; @Before @@ -142,7 +141,7 @@ public class DatasourceInputFormatTest ) ); - config = new Configuration(); + config = new JobConf(); config.set( DatasourceInputFormat.CONF_INPUT_SEGMENTS, new DefaultObjectMapper().writeValueAsString(segments) @@ -238,6 +237,34 @@ public class DatasourceInputFormatTest Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations()); } + @Test + public void testGetSplitsCombineCalculated() throws Exception + { + config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "-1"); + config.setNumMapTasks(3); + List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); + + Assert.assertEquals(3, splits.size()); + + Assert.assertEquals( + Sets.newHashSet(segments.get(0)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) + ); + Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations()); + + Assert.assertEquals( + Sets.newHashSet(segments.get(2)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) + ); + Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations()); + + Assert.assertEquals( + Sets.newHashSet(segments.get(1)), + Sets.newHashSet((((DatasourceInputSplit) splits.get(2)).getSegments())) + ); + Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(2).getLocations()); + } + @Test public void testGetRecordReader() throws Exception {