From 68631d89e975ab2edc01421519c09a8c7b31b983 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 1 Jul 2015 08:36:09 -0700 Subject: [PATCH 1/2] Allow realtime nodes to have multiple shards of the same datasource --- .../segment/realtime/RealtimeManager.java | 97 ++++++++++++++----- 1 file changed, 71 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 6f1080b8ea3..64ec2b247c6 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -28,6 +28,7 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; @@ -51,9 +52,12 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumbers; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -69,7 +73,7 @@ public class RealtimeManager implements QuerySegmentWalker /** * key=data source name,value=FireChiefs of all partition of that data source */ - private final Map> chiefs; + private final Map> chiefs; @Inject public RealtimeManager( @@ -90,12 +94,12 @@ public class RealtimeManager implements QuerySegmentWalker DataSchema schema = fireDepartment.getDataSchema(); final FireChief chief = new FireChief(fireDepartment); - List chiefs = this.chiefs.get(schema.getDataSource()); + Map chiefs = this.chiefs.get(schema.getDataSource()); if (chiefs == null) { - chiefs = new ArrayList(); + chiefs = new HashMap(); this.chiefs.put(schema.getDataSource(), chiefs); } - chiefs.add(chief); + chiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); chief.setName( String.format( @@ -112,8 +116,8 @@ public class RealtimeManager implements QuerySegmentWalker @LifecycleStop public void stop() { - for (Iterable chiefs : this.chiefs.values()) { - for (FireChief chief : chiefs) { + for (Map chiefs : this.chiefs.values()) { + for (FireChief chief : chiefs.values()) { CloseQuietly.close(chief); } } @@ -121,12 +125,12 @@ public class RealtimeManager implements QuerySegmentWalker public FireDepartmentMetrics getMetrics(String datasource) { - List chiefs = this.chiefs.get(datasource); + Map chiefs = this.chiefs.get(datasource); if (chiefs == null) { return null; } FireDepartmentMetrics snapshot = null; - for (FireChief chief : chiefs) { + for (FireChief chief : chiefs.values()) { if (snapshot == null) { snapshot = chief.getMetrics().snapshot(); } else { @@ -139,30 +143,71 @@ public class RealtimeManager implements QuerySegmentWalker @Override public QueryRunner getQueryRunnerForIntervals(final Query query, Iterable intervals) { - return getQueryRunnerForSegments(query, null); + final QueryRunnerFactory> factory = conglomerate.findFactory(query); + final Iterable runners; + final List names = query.getDataSource().getNames(); + runners = Iterables.transform( + names, new Function() + { + @Override + public QueryRunner apply(String input) + { + Map chiefsOfDataSource = chiefs.get(input); + return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + Iterables.transform( + chiefsOfDataSource.values(), new Function>() + { + @Override + public QueryRunner apply(FireChief input) + { + return input.getQueryRunner(query); + } + } + ) + ) + ); + } + } + ); + return new UnionQueryRunner<>( + runners, conglomerate.findFactory(query).getToolchest() + ); } @Override - public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) + public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); + List runners = new ArrayList(); + for (String dataSource : query.getDataSource().getNames()) { + final Map dataSourceChiefs = RealtimeManager.this.chiefs.get(dataSource); + if (dataSourceChiefs == null) { + continue; + } - Iterable chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames())); - return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - chiefsOfDataSource, new Function>() - { - @Override - public QueryRunner apply(FireChief input) - { - return input.getQueryRunner(query); - } - } - ) - ) + QueryToolChest> toolchest = factory.getToolchest(); + Iterable> subRunners = Iterables.transform( + specs, + new Function>() + { + @Nullable + @Override + public QueryRunner apply(SegmentDescriptor spec) + { + FireChief retVal = dataSourceChiefs.get(spec.getPartitionNumber()); + return retVal == null ? new NoopQueryRunner() : retVal.getQueryRunner(query); + } + } + ); + runners.add( + toolchest.mergeResults(factory.mergeRunners(MoreExecutors.sameThreadExecutor(), subRunners)) + ); + } + return new UnionQueryRunner<>( + runners, conglomerate.findFactory(query).getToolchest() ); } From 4edcb1b86157a0b028f75a89184c8a21c31014d7 Mon Sep 17 00:00:00 2001 From: Bingkun Guo Date: Thu, 25 Feb 2016 00:37:07 -0600 Subject: [PATCH 2/2] Refactor FireChief + UTs for RealtimeManagerTest Add tests that verify whether RealtimeManager is querying the correct FireChief for a specific partition make FireChief static and package private, add latches in the UT --- .../segment/realtime/RealtimeManager.java | 135 +++---- .../segment/realtime/RealtimeManagerTest.java | 367 +++++++++++++++++- 2 files changed, 428 insertions(+), 74 deletions(-) diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 64ec2b247c6..85d59b4b53f 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -28,7 +28,6 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.common.guava.CloseQuietly; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; @@ -52,11 +51,8 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumbers; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,7 +67,7 @@ public class RealtimeManager implements QuerySegmentWalker private final QueryRunnerFactoryConglomerate conglomerate; /** - * key=data source name,value=FireChiefs of all partition of that data source + * key=data source name,value=mappings of partition number to FireChief */ private final Map> chiefs; @@ -87,19 +83,30 @@ public class RealtimeManager implements QuerySegmentWalker this.chiefs = Maps.newHashMap(); } + RealtimeManager( + List fireDepartments, + QueryRunnerFactoryConglomerate conglomerate, + Map> chiefs + ) + { + this.fireDepartments = fireDepartments; + this.conglomerate = conglomerate; + this.chiefs = chiefs; + } + @LifecycleStart public void start() throws IOException { for (final FireDepartment fireDepartment : fireDepartments) { - DataSchema schema = fireDepartment.getDataSchema(); + final DataSchema schema = fireDepartment.getDataSchema(); - final FireChief chief = new FireChief(fireDepartment); - Map chiefs = this.chiefs.get(schema.getDataSource()); - if (chiefs == null) { - chiefs = new HashMap(); - this.chiefs.put(schema.getDataSource(), chiefs); + final FireChief chief = new FireChief(fireDepartment, conglomerate); + Map partitionChiefs = chiefs.get(schema.getDataSource()); + if (partitionChiefs == null) { + partitionChiefs = new HashMap<>(); + chiefs.put(schema.getDataSource(), partitionChiefs); } - chiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); + partitionChiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief); chief.setName( String.format( @@ -144,36 +151,24 @@ public class RealtimeManager implements QuerySegmentWalker public QueryRunner getQueryRunnerForIntervals(final Query query, Iterable intervals) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final Iterable runners; - final List names = query.getDataSource().getNames(); - runners = Iterables.transform( - names, new Function() - { - @Override - public QueryRunner apply(String input) - { - Map chiefsOfDataSource = chiefs.get(input); - return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - chiefsOfDataSource.values(), new Function>() - { - @Override - public QueryRunner apply(FireChief input) - { - return input.getQueryRunner(query); - } - } - ) - ) - ); - } - } - ); - return new UnionQueryRunner<>( - runners, conglomerate.findFactory(query).getToolchest() + final Map partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource() + .getNames())); + + return partitionChiefs == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + Iterables.transform( + partitionChiefs.values(), new Function>() + { + @Override + public QueryRunner apply(FireChief fireChief) + { + return fireChief.getQueryRunner(query); + } + } + ) + ) ); } @@ -181,52 +176,46 @@ public class RealtimeManager implements QuerySegmentWalker public QueryRunner getQueryRunnerForSegments(final Query query, final Iterable specs) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); - List runners = new ArrayList(); - for (String dataSource : query.getDataSource().getNames()) { - final Map dataSourceChiefs = RealtimeManager.this.chiefs.get(dataSource); - if (dataSourceChiefs == null) { - continue; - } + final Map partitionChiefs = chiefs.get(Iterables.getOnlyElement(query.getDataSource() + .getNames())); - QueryToolChest> toolchest = factory.getToolchest(); - Iterable> subRunners = Iterables.transform( - specs, - new Function>() - { - @Nullable - @Override - public QueryRunner apply(SegmentDescriptor spec) - { - FireChief retVal = dataSourceChiefs.get(spec.getPartitionNumber()); - return retVal == null ? new NoopQueryRunner() : retVal.getQueryRunner(query); - } - } - ); - runners.add( - toolchest.mergeResults(factory.mergeRunners(MoreExecutors.sameThreadExecutor(), subRunners)) - ); - } - return new UnionQueryRunner<>( - runners, conglomerate.findFactory(query).getToolchest() - ); + return partitionChiefs == null + ? new NoopQueryRunner() + : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Iterables.transform( + specs, + new Function>() + { + @Override + public QueryRunner apply(SegmentDescriptor spec) + { + final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber()); + return retVal == null ? new NoopQueryRunner() : retVal.getQueryRunner(query); + } + } + ) + ) + ); } - private class FireChief extends Thread implements Closeable + static class FireChief extends Thread implements Closeable { private final FireDepartment fireDepartment; private final FireDepartmentMetrics metrics; private final RealtimeTuningConfig config; + private final QueryRunnerFactoryConglomerate conglomerate; private volatile Firehose firehose = null; private volatile FirehoseV2 firehoseV2 = null; private volatile Plumber plumber = null; private volatile boolean normalExit = true; - public FireChief( - FireDepartment fireDepartment - ) + public FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate) { this.fireDepartment = fireDepartment; + this.conglomerate = conglomerate; this.config = fireDepartment.getTuningConfig(); this.metrics = fireDepartment.getMetrics(); } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 0f1f31571fb..48e60398a12 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -22,10 +22,14 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; +import io.druid.collections.StupidPool; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -38,8 +42,23 @@ import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.SegmentDescriptor; +import io.druid.query.TestQueryRunners; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryEngine; +import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.query.groupby.GroupByQueryRunnerTestHelper; +import io.druid.segment.TestHelper; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; @@ -48,18 +67,23 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; +import io.druid.timeline.partition.LinearShardSpec; import io.druid.utils.Runnables; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** @@ -68,10 +92,13 @@ public class RealtimeManagerTest { private RealtimeManager realtimeManager; private RealtimeManager realtimeManager2; + private RealtimeManager realtimeManager3; private DataSchema schema; private DataSchema schema2; private TestPlumber plumber; private TestPlumber plumber2; + private QueryRunnerFactory factory; + private CountDownLatch chiefStartedLatch; @Before public void setUp() throws Exception @@ -180,6 +207,134 @@ public class RealtimeManagerTest ), null ); + + factory = initFactory(); + + RealtimeIOConfig ioConfig3 = new RealtimeIOConfig( + new FirehoseFactory() + { + @Override + public Firehose connect(InputRowParser parser) throws IOException + { + return new InfiniteTestFirehose(); + } + }, + new PlumberSchool() + { + @Override + public Plumber findPlumber( + DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + ) + { + return plumber; + } + }, + null + ); + + RealtimeTuningConfig tuningConfig_0 = new RealtimeTuningConfig( + 1, + new Period("P1Y"), + null, + null, + null, + null, + null, + new LinearShardSpec(0), + null, + null, + 0, + 0, + null + ); + + RealtimeTuningConfig tuningConfig_1 = new RealtimeTuningConfig( + 1, + new Period("P1Y"), + null, + null, + null, + null, + null, + new LinearShardSpec(1), + null, + null, + 0, + 0, + null + ); + + DataSchema schema2 = new DataSchema( + "testing", + null, + new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, + new UniformGranularitySpec(Granularity.HOUR, QueryGranularity.NONE, null), + jsonMapper + ); + + FireDepartment department_0 = new FireDepartment(schema2, ioConfig3, tuningConfig_0); + FireDepartment department_1 = new FireDepartment(schema2, ioConfig3, tuningConfig_1); + + QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate() + { + @Override + public > QueryRunnerFactory findFactory(QueryType query) + { + return factory; + } + }; + + chiefStartedLatch = new CountDownLatch(2); + + RealtimeManager.FireChief fireChief_0 = new RealtimeManager.FireChief(department_0, conglomerate) + { + @Override + public void run() + { + super.initPlumber(); + chiefStartedLatch.countDown(); + } + }; + + RealtimeManager.FireChief fireChief_1 = new RealtimeManager.FireChief(department_1, conglomerate) + { + @Override + public void run() + { + super.initPlumber(); + chiefStartedLatch.countDown(); + } + }; + + + realtimeManager3 = new RealtimeManager( + Arrays.asList(department_0, department_1), + conglomerate, + ImmutableMap.>of( + "testing", + ImmutableMap.of( + 0, + fireChief_0, + 1, + fireChief_1 + ) + ) + ); + + startFireChiefWithPartitionNum(fireChief_0, 0); + startFireChiefWithPartitionNum(fireChief_1, 1); + } + + private void startFireChiefWithPartitionNum(RealtimeManager.FireChief fireChief, int partitionNum) + { + fireChief.setName( + String.format( + "chief-%s[%s]", + "testing", + partitionNum + ) + ); + fireChief.start(); } @Test @@ -224,6 +379,176 @@ public class RealtimeManagerTest Assert.assertEquals(0, plumber2.getPersistCount()); } + @Test(timeout = 5_000L) + public void testQueryWithInterval() throws IOException, InterruptedException + { + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 270L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 236L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 316L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 240L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 5740L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 242L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 5800L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 156L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 238L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 294L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 224L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 332L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 226L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4894L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 228L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 5010L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 194L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 252L) + ); + + chiefStartedLatch.await(); + + for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { + plumber.setRunner(runner); + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForIntervals( + query, + QueryRunnerTestHelper.firstToThird.getIntervals() + ), + query + ); + + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + } + + @Test(timeout = 5_000L) + public void testQueryWithSegmentSpec() throws IOException, InterruptedException + { + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), + + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L) + ); + + chiefStartedLatch.await(); + + for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { + plumber.setRunner(runner); + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + new SegmentDescriptor( + new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), + "ver", + 0 + )) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + new SegmentDescriptor( + new Interval("2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), + "ver", + 1 + )) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } + + } + + private GroupByQueryRunnerFactory initFactory() + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final StupidPool pool = new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ); + final GroupByQueryConfig config = new GroupByQueryConfig(); + config.setMaxIntermediateRows(10000); + final Supplier configSupplier = Suppliers.ofInstance(config); + final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); + return new GroupByQueryRunnerFactory( + engine, + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + configSupplier, + new GroupByQueryQueryToolChest( + configSupplier, mapper, engine, TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ), + TestQueryRunners.pool + ); + } + + @After + public void tearDown() throws Exception + { + realtimeManager.stop(); + realtimeManager2.stop(); + realtimeManager3.stop(); + } + private TestInputRowHolder makeRow(final long timestamp) { return new TestInputRowHolder(timestamp, null); @@ -304,6 +629,35 @@ public class RealtimeManagerTest } } + private static class InfiniteTestFirehose implements Firehose + { + private boolean hasMore = true; + + @Override + public boolean hasMore() + { + return hasMore; + } + + @Override + public InputRow nextRow() + { + return null; + } + + @Override + public Runnable commit() + { + return Runnables.getNoopRunnable(); + } + + @Override + public void close() throws IOException + { + hasMore = false; + } + } + private static class TestFirehose implements Firehose { private final Iterator rows; @@ -419,6 +773,8 @@ public class RealtimeManagerTest private volatile boolean finishedJob = false; private volatile int persistCount = 0; + private QueryRunner runner; + private TestPlumber(Sink sink) { this.sink = sink; @@ -473,7 +829,10 @@ public class RealtimeManagerTest @Override public QueryRunner getQueryRunner(Query query) { - throw new UnsupportedOperationException(); + if (runner == null) { + throw new UnsupportedOperationException(); + } + return runner; } @Override @@ -487,5 +846,11 @@ public class RealtimeManagerTest { finishedJob = true; } + + public void setRunner(QueryRunner runner) + { + this.runner = runner; + } } + }