provide configuration to enable usage of Off heap merging for groupBy query

This commit is contained in:
Himanshu Gupta 2016-01-23 00:13:09 -06:00
parent b40c342cd1
commit 9fe1b28ee5
1 changed files with 25 additions and 10 deletions

View File

@ -32,6 +32,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import java.nio.ByteBuffer;
@ -45,7 +46,6 @@ public class GroupByQueryHelper
final GroupByQuery query,
final GroupByQueryConfig config,
StupidPool<ByteBuffer> bufferPool
)
{
final QueryGranularity gran = query.getGranularity();
@ -77,7 +77,21 @@ public class GroupByQueryHelper
}
}
);
final IncrementalIndex index = new OnheapIncrementalIndex(
final IncrementalIndex index;
if (query.getContextValue("useOffheap", false)) {
index = new OffheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
config.getMaxResults(),
bufferPool
);
} else {
index = new OnheapIncrementalIndex(
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
@ -86,6 +100,7 @@ public class GroupByQueryHelper
false,
config.getMaxResults()
);
}
Accumulator<IncrementalIndex, T> accumulator = new Accumulator<IncrementalIndex, T>()
{