diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 64fa14679fc..507f589f459 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -162,6 +162,7 @@ Append tasks append a list of segments together into a single segment (one after "id": , "dataSource": , "segments": , + "buildV9Directly": , "aggregations": } ``` @@ -180,6 +181,7 @@ The grammar is: "dataSource": , "aggregations": , "rollup": , + "buildV9Directly": , "segments": } ``` diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java index 36fd093c160..b53a6cd64a1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppendTask.java @@ -29,6 +29,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import io.druid.indexing.common.TaskToolbox; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.IndexableAdapter; import io.druid.segment.QueryableIndexIndexableAdapter; @@ -49,9 +50,10 @@ import java.util.Map; */ public class AppendTask extends MergeTaskBase { - + private static final Boolean defaultBuildV9Directly = Boolean.TRUE; private final IndexSpec indexSpec; private final List aggregators; + private final Boolean buildV9Directly; @JsonCreator public AppendTask( @@ -60,12 +62,14 @@ public class AppendTask extends MergeTaskBase @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("context") Map context ) { super(id, dataSource, segments, context); this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; this.aggregators = aggregators; + this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly; } @Override @@ -131,7 +135,8 @@ public class AppendTask extends MergeTaskBase ); } - return toolbox.getIndexMerger().append( + IndexMerger indexMerger = buildV9Directly ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(); + return indexMerger.append( adapters, aggregators == null ? null : aggregators.toArray(new AggregatorFactory[aggregators.size()]), outDir, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java index 9df49936e09..7cc914f0df5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import io.druid.indexing.common.TaskToolbox; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.QueryableIndex; import io.druid.timeline.DataSegment; @@ -42,10 +43,12 @@ import java.util.Map; */ public class MergeTask extends MergeTaskBase { + private static final Boolean defaultBuildV9Directly = Boolean.TRUE; @JsonIgnore private final List aggregators; private final Boolean rollup; private final IndexSpec indexSpec; + private final Boolean buildV9Directly; @JsonCreator public MergeTask( @@ -55,6 +58,7 @@ public class MergeTask extends MergeTaskBase @JsonProperty("aggregations") List aggregators, @JsonProperty("rollup") Boolean rollup, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("context") Map context ) { @@ -62,13 +66,15 @@ public class MergeTask extends MergeTaskBase this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations"); this.rollup = rollup == null ? Boolean.TRUE : rollup; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly; } @Override public File merge(final TaskToolbox toolbox, final Map segments, final File outDir) throws Exception { - return toolbox.getIndexMerger().mergeQueryableIndex( + IndexMerger indexMerger = buildV9Directly ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(); + return indexMerger.mergeQueryableIndex( Lists.transform( ImmutableList.copyOf(segments.values()), new Function() diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 7003e4d816f..f23f4f392ef 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -298,6 +298,7 @@ public class TaskSerdeTest aggregators, true, indexSpec, + true, null ); @@ -529,6 +530,7 @@ public class TaskSerdeTest new CountAggregatorFactory("cnt") ), indexSpec, + true, null );