diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index 46edd1f917d..6e47ebaa2e2 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -139,8 +139,9 @@ public class ConcurrentGrouper implements Grouper getCombiningFactories(aggregatorFactories), combineKeySerdeFactory, executor, + sortHasNonGroupingFields, concurrencyHint, - sortHasNonGroupingFields + priority ); this.maxDictionarySizeForCombiner = combineKeySerdeFactory.getMaxDictionarySize(); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java index ad3326d7ba4..e14de0bc997 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/ParallelCombiner.java @@ -31,6 +31,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.parsers.CloseableIterator; +import io.druid.query.AbstractPrioritizedCallable; import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; @@ -71,14 +72,16 @@ public class ParallelCombiner private final ListeningExecutorService executor; private final Comparator> keyObjComparator; private final int concurrencyHint; + private final int priority; public ParallelCombiner( Supplier> combineBufferSupplier, AggregatorFactory[] combiningFactories, KeySerdeFactory combineKeySerdeFactory, ListeningExecutorService executor, + boolean sortHasNonGroupingFields, int concurrencyHint, - boolean sortHasNonGroupingFields + int priority ) { this.combineBufferSupplier = combineBufferSupplier; @@ -87,6 +90,7 @@ public class ParallelCombiner this.executor = executor; this.keyObjComparator = combineKeySerdeFactory.objectComparator(sortHasNonGroupingFields);; this.concurrencyHint = concurrencyHint; + this.priority = priority; } /** @@ -322,29 +326,37 @@ public class ParallelCombiner combiningFactories ); - final ListenableFuture future = executor.submit(() -> { - grouper.init(); + final ListenableFuture future = executor.submit( + new AbstractPrioritizedCallable(priority) + { + @Override + public Void call() throws Exception + { + grouper.init(); - try ( - CloseableIterator> mergedIterator = CloseableIterators.mergeSorted( - iterators, - keyObjComparator - ) - ) { - while (mergedIterator.hasNext()) { - final Entry next = mergedIterator.next(); + try ( + CloseableIterator> mergedIterator = CloseableIterators.mergeSorted( + iterators, + keyObjComparator + ) + ) { + while (mergedIterator.hasNext()) { + final Entry next = mergedIterator.next(); - settableColumnSelectorFactory.set(next.values); - grouper.aggregate(next.key); // grouper always returns ok or throws an exception - settableColumnSelectorFactory.set(null); + settableColumnSelectorFactory.set(next.values); + grouper.aggregate(next.key); // grouper always returns ok or throws an exception + settableColumnSelectorFactory.set(null); + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + + grouper.finish(); + return null; + } } - } - catch (IOException e) { - throw Throwables.propagate(e); - } - - grouper.finish(); - }); + ); return new Pair<>(grouper.iterator(), future); } diff --git a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java index 5c1ec778ca8..0fe75d822cc 100644 --- a/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/epinephelinae/ParallelCombinerTest.java @@ -113,8 +113,9 @@ public class ParallelCombinerTest new AggregatorFactory[]{new CountAggregatorFactory("cnt").getCombiningFactory()}, ConcurrentGrouperTest.KEY_SERDE_FACTORY, MoreExecutors.listeningDecorator(SERVICE), + false, THREAD_NUM, - false + 0 ); final int numRows = 1000;