mirror of https://github.com/apache/druid.git
buildV9Directly in MergeTask and AppendTask (#3976)
* buildV9Directly in MergeTask and AppendTask * add doc
This commit is contained in:
parent
78b8a57ad6
commit
ef6a19c81b
|
@ -162,6 +162,7 @@ Append tasks append a list of segments together into a single segment (one after
|
|||
"id": <task_id>,
|
||||
"dataSource": <task_datasource>,
|
||||
"segments": <JSON list of DataSegment objects to append>,
|
||||
"buildV9Directly": <true or false, default true>,
|
||||
"aggregations": <optional list of aggregators>
|
||||
}
|
||||
```
|
||||
|
@ -180,6 +181,7 @@ The grammar is:
|
|||
"dataSource": <task_datasource>,
|
||||
"aggregations": <list of aggregators>,
|
||||
"rollup": <whether or not to rollup data during a merge>,
|
||||
"buildV9Directly": <true or false, default true>,
|
||||
"segments": <JSON list of DataSegment objects to merge>
|
||||
}
|
||||
```
|
||||
|
|
|
@ -29,6 +29,7 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Ordering;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.IndexMerger;
|
||||
import io.druid.segment.IndexSpec;
|
||||
import io.druid.segment.IndexableAdapter;
|
||||
import io.druid.segment.QueryableIndexIndexableAdapter;
|
||||
|
@ -49,9 +50,10 @@ import java.util.Map;
|
|||
*/
|
||||
public class AppendTask extends MergeTaskBase
|
||||
{
|
||||
|
||||
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
|
||||
private final IndexSpec indexSpec;
|
||||
private final List<AggregatorFactory> aggregators;
|
||||
private final Boolean buildV9Directly;
|
||||
|
||||
@JsonCreator
|
||||
public AppendTask(
|
||||
|
@ -60,12 +62,14 @@ public class AppendTask extends MergeTaskBase
|
|||
@JsonProperty("segments") List<DataSegment> segments,
|
||||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
super(id, dataSource, segments, context);
|
||||
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
|
||||
this.aggregators = aggregators;
|
||||
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -131,7 +135,8 @@ public class AppendTask extends MergeTaskBase
|
|||
);
|
||||
}
|
||||
|
||||
return toolbox.getIndexMerger().append(
|
||||
IndexMerger indexMerger = buildV9Directly ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger();
|
||||
return indexMerger.append(
|
||||
adapters,
|
||||
aggregators == null ? null : aggregators.toArray(new AggregatorFactory[aggregators.size()]),
|
||||
outDir,
|
||||
|
|
|
@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.Lists;
|
||||
import io.druid.indexing.common.TaskToolbox;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.IndexMerger;
|
||||
import io.druid.segment.IndexSpec;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
@ -42,10 +43,12 @@ import java.util.Map;
|
|||
*/
|
||||
public class MergeTask extends MergeTaskBase
|
||||
{
|
||||
private static final Boolean defaultBuildV9Directly = Boolean.TRUE;
|
||||
@JsonIgnore
|
||||
private final List<AggregatorFactory> aggregators;
|
||||
private final Boolean rollup;
|
||||
private final IndexSpec indexSpec;
|
||||
private final Boolean buildV9Directly;
|
||||
|
||||
@JsonCreator
|
||||
public MergeTask(
|
||||
|
@ -55,6 +58,7 @@ public class MergeTask extends MergeTaskBase
|
|||
@JsonProperty("aggregations") List<AggregatorFactory> aggregators,
|
||||
@JsonProperty("rollup") Boolean rollup,
|
||||
@JsonProperty("indexSpec") IndexSpec indexSpec,
|
||||
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
|
||||
@JsonProperty("context") Map<String, Object> context
|
||||
)
|
||||
{
|
||||
|
@ -62,13 +66,15 @@ public class MergeTask extends MergeTaskBase
|
|||
this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations");
|
||||
this.rollup = rollup == null ? Boolean.TRUE : rollup;
|
||||
this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
|
||||
this.buildV9Directly = buildV9Directly == null ? defaultBuildV9Directly : buildV9Directly;
|
||||
}
|
||||
|
||||
@Override
|
||||
public File merge(final TaskToolbox toolbox, final Map<DataSegment, File> segments, final File outDir)
|
||||
throws Exception
|
||||
{
|
||||
return toolbox.getIndexMerger().mergeQueryableIndex(
|
||||
IndexMerger indexMerger = buildV9Directly ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger();
|
||||
return indexMerger.mergeQueryableIndex(
|
||||
Lists.transform(
|
||||
ImmutableList.copyOf(segments.values()),
|
||||
new Function<File, QueryableIndex>()
|
||||
|
|
|
@ -298,6 +298,7 @@ public class TaskSerdeTest
|
|||
aggregators,
|
||||
true,
|
||||
indexSpec,
|
||||
true,
|
||||
null
|
||||
);
|
||||
|
||||
|
@ -529,6 +530,7 @@ public class TaskSerdeTest
|
|||
new CountAggregatorFactory("cnt")
|
||||
),
|
||||
indexSpec,
|
||||
true,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue