diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 31938f8631b..552bea9085b 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -44,6 +44,7 @@ import io.druid.segment.incremental.IncrementalIndex; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -84,7 +85,7 @@ public class GroupByParallelQueryRunner implements QueryRunner configSupplier.get(), bufferPool ); - final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); + final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final boolean bySegment = query.getContextBySegment(false); final int priority = query.getContextPriority(0); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 597e3ba821f..5f07b6fcf39 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -36,7 +36,10 @@ import io.druid.segment.incremental.OffheapIncrementalIndex; import io.druid.segment.incremental.OnheapIncrementalIndex; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; public class GroupByQueryHelper { @@ -130,18 +133,19 @@ public class GroupByQueryHelper return new Pair<>(index, accumulator); } - public static Pair> createBySegmentAccumulatorPair() + public static Pair> createBySegmentAccumulatorPair() { - List init = Lists.newArrayList(); - Accumulator accumulator = new Accumulator() + // In parallel query runner multiple threads add to this queue concurrently + Queue init = new ConcurrentLinkedQueue<>(); + Accumulator accumulator = new Accumulator() { @Override - public List accumulate(List accumulated, T in) + public Queue accumulate(Queue accumulated, T in) { if(in == null){ throw new ISE("Cannot have null result"); } - accumulated.add(in); + accumulated.offer(in); return accumulated; } }; diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index 9ac86ffc523..58ccf98c083 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -49,6 +49,7 @@ import io.druid.segment.incremental.IncrementalIndex; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -115,7 +116,7 @@ public class GroupByQueryRunnerFactory implements QueryRunnerFactory> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); + final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final int priority = query.getContextPriority(0); final boolean bySegment = query.getContextBySegment(false); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 89dec7c2e71..da31afa0dde 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -22,6 +22,7 @@ import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -33,9 +34,14 @@ import io.druid.data.input.Row; import io.druid.granularity.PeriodGranularity; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.BySegmentResultValue; +import io.druid.query.BySegmentResultValueClass; +import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; +import io.druid.query.Result; import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -88,6 +94,8 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @RunWith(Parameterized.class) public class GroupByQueryRunnerTest @@ -2337,4 +2345,59 @@ public class GroupByQueryRunnerTest Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } + + @Test + public void testBySegmentResults() + { + int segmentCount = 32; + Result singleSegmentResult = new Result( + new DateTime("2011-01-12T00:00:00.000Z"), + new BySegmentResultValueClass( + Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow( + "2011-04-01", + "alias", + "mezzanine", + "rows", + 6L, + "idx", + 4420L + ) + ), "testSegment", new Interval("2011-04-02T00:00:00.000Z/2011-04-04T00:00:00.000Z") + ) + ); + List bySegmentResults = Lists.newArrayList(); + for (int i = 0; i < segmentCount; i++) { + bySegmentResults.add(singleSegmentResult); + } + GroupByQuery.Builder builder = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setInterval("2011-04-02/2011-04-04") + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) + .setDimFilter(new SelectorDimFilter("quality", "mezzanine")) + .setContext(ImmutableMap.of("bySegment", true)); + final GroupByQuery fullQuery = builder.build(); + QueryToolChest toolChest = factory.getToolchest(); + + List> singleSegmentRunners = Lists.newArrayList(); + for (int i = 0; i < segmentCount; i++) { + singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner)); + } + ExecutorService exec = Executors.newCachedThreadPool(); + QueryRunner theRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(factory.mergeRunners(Executors.newCachedThreadPool(), singleSegmentRunners)), + toolChest + ); + + TestHelper.assertExpectedObjects(bySegmentResults, theRunner.run(fullQuery, Maps.newHashMap()), ""); + exec.shutdownNow(); + } }