From 9fe1b28ee5451fb948a339157fed70a872c8e264 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Sat, 23 Jan 2016 00:13:09 -0600 Subject: [PATCH] provide configuration to enable usage of Off heap merging for groupBy query --- .../query/groupby/GroupByQueryHelper.java | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index c0ab3f5913d..49caa8013bd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IndexSizeExceededException; +import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import java.nio.ByteBuffer; @@ -45,7 +46,6 @@ public class GroupByQueryHelper final GroupByQuery query, final GroupByQueryConfig config, StupidPool bufferPool - ) { final QueryGranularity gran = query.getGranularity(); @@ -77,15 +77,30 @@ public class GroupByQueryHelper } } ); - final IncrementalIndex index = new OnheapIncrementalIndex( - // use granularity truncated min timestamp - // since incoming truncated timestamps may precede timeStart - granTimeStart, - gran, - aggs.toArray(new AggregatorFactory[aggs.size()]), - false, - config.getMaxResults() - ); + final IncrementalIndex index; + + if (query.getContextValue("useOffheap", false)) { + index = new OffheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + false, + config.getMaxResults(), + bufferPool + ); + } else { + index = new OnheapIncrementalIndex( + // use granularity truncated min timestamp + // since incoming truncated timestamps may precede timeStart + granTimeStart, + gran, + aggs.toArray(new AggregatorFactory[aggs.size()]), + false, + config.getMaxResults() + ); + } Accumulator accumulator = new Accumulator() {