support singleThreaded flag for groupBy-v2 as well (#3992)

This commit is contained in:
Himanshu 2017-03-03 12:13:06 -06:00 committed by Nishant Bangarwa
parent 4a56d7d8a0
commit e7e3c2dc5a
2 changed files with 19 additions and 2 deletions

View File

@ -167,6 +167,7 @@ When using the "v2" strategy, the following runtime properties apply:
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
@ -189,6 +190,7 @@ When using the "v2" strategy, the following query context parameters apply:
|Property|Description|
|--------|-----------|
|`groupByStrategy`|Overrides the value of `druid.query.groupBy.defaultStrategy` for this query.|
|`groupByIsSingleThreaded`|Overrides the value of `druid.query.groupBy.singleThreaded` for this query.|
|`bufferGrouperInitialBuckets`|Overrides the value of `druid.query.groupBy.bufferGrouperInitialBuckets` for this query.|
|`bufferGrouperMaxLoadFactor`|Overrides the value of `druid.query.groupBy.bufferGrouperMaxLoadFactor` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|

View File

@ -24,6 +24,7 @@ import com.google.common.base.Function;
import com.google.common.base.Predicates;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -128,6 +129,8 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
return new ChainedExecutionQueryRunner(exec, queryWatcher, queryables).run(query, responseContext);
}
final boolean isSingleThreaded = querySpecificConfig.isSingleThreaded();
final AggregatorFactory[] combiningAggregatorFactories = new AggregatorFactory[query.getAggregatorSpecs().size()];
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
combiningAggregatorFactories[i] = query.getAggregatorSpecs().get(i).getCombiningFactory();
@ -211,7 +214,7 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
);
}
return exec.submit(
ListenableFuture<Boolean> future = exec.submit(
new AbstractPrioritizedCallable<Boolean>(priority)
{
@Override
@ -237,13 +240,25 @@ public class GroupByMergingQueryRunnerV2 implements QueryRunner<Row>
}
}
);
if (isSingleThreaded) {
waitForFutureCompletion(
query,
Futures.allAsList(ImmutableList.of(future)),
timeoutAt - System.currentTimeMillis()
);
}
return future;
}
}
)
)
);
waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis());
if (!isSingleThreaded) {
waitForFutureCompletion(query, futures, timeoutAt - System.currentTimeMillis());
}
return RowBasedGrouperHelper.makeGrouperIterator(
grouper,