diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 84357cb13ab..8f1d70f49fa 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -400,6 +400,18 @@ public class QueryRunnerTestHelper }; } + public static > QueryRunner makeQueryRunner( + QueryRunnerFactory factory, + String resourceFileName + ) + { + return makeQueryRunner( + factory, + segmentId, + new IncrementalIndexSegment(TestIndex.makeRealtimeIndex(resourceFileName), segmentId) + ); + } + public static > QueryRunner makeQueryRunner( QueryRunnerFactory factory, Segment adapter diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 7aa22aa238e..fb82f837771 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -163,7 +163,7 @@ public class TestIndex } } - private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) + public static IncrementalIndex makeRealtimeIndex(final String resourceFilename) { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); if (resource == null) { diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 85d59b4b53f..5cdd6e33c97 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -44,6 +44,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; +import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Committers; @@ -192,7 +193,9 @@ public class RealtimeManager implements QuerySegmentWalker public QueryRunner apply(SegmentDescriptor spec) { final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber()); - return retVal == null ? new NoopQueryRunner() : retVal.getQueryRunner(query); + return retVal == null + ? new NoopQueryRunner() + : retVal.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(spec))); } } ) diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 48e60398a12..2d0bcac50ed 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -20,12 +20,15 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; @@ -40,6 +43,7 @@ import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.BaseQuery; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -58,6 +62,10 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.query.groupby.GroupByQueryRunnerTestHelper; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.MultipleSpecificSegmentSpec; +import io.druid.query.spec.SpecificSegmentQueryRunner; +import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; @@ -75,6 +83,7 @@ import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -90,6 +99,8 @@ import java.util.concurrent.TimeUnit; */ public class RealtimeManagerTest { + private static QueryRunnerFactory factory; + private RealtimeManager realtimeManager; private RealtimeManager realtimeManager2; private RealtimeManager realtimeManager3; @@ -97,8 +108,16 @@ public class RealtimeManagerTest private DataSchema schema2; private TestPlumber plumber; private TestPlumber plumber2; - private QueryRunnerFactory factory; private CountDownLatch chiefStartedLatch; + private RealtimeTuningConfig tuningConfig_0; + private RealtimeTuningConfig tuningConfig_1; + private DataSchema schema3; + + @BeforeClass + public static void setupStatic() + { + factory = initFactory(); + } @Before public void setUp() throws Exception @@ -208,31 +227,7 @@ public class RealtimeManagerTest null ); - factory = initFactory(); - - RealtimeIOConfig ioConfig3 = new RealtimeIOConfig( - new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - return new InfiniteTestFirehose(); - } - }, - new PlumberSchool() - { - @Override - public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics - ) - { - return plumber; - } - }, - null - ); - - RealtimeTuningConfig tuningConfig_0 = new RealtimeTuningConfig( + tuningConfig_0 = new RealtimeTuningConfig( 1, new Period("P1Y"), null, @@ -248,7 +243,7 @@ public class RealtimeManagerTest null ); - RealtimeTuningConfig tuningConfig_1 = new RealtimeTuningConfig( + tuningConfig_1 = new RealtimeTuningConfig( 1, new Period("P1Y"), null, @@ -264,7 +259,7 @@ public class RealtimeManagerTest null ); - DataSchema schema2 = new DataSchema( + schema3 = new DataSchema( "testing", null, new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, @@ -272,8 +267,8 @@ public class RealtimeManagerTest jsonMapper ); - FireDepartment department_0 = new FireDepartment(schema2, ioConfig3, tuningConfig_0); - FireDepartment department_1 = new FireDepartment(schema2, ioConfig3, tuningConfig_1); + FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0); + FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1); QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate() { @@ -379,7 +374,7 @@ public class RealtimeManagerTest Assert.assertEquals(0, plumber2.getPersistCount()); } - @Test(timeout = 5_000L) + @Test(timeout = 10_000L) public void testQueryWithInterval() throws IOException, InterruptedException { List expectedResults = Arrays.asList( @@ -407,7 +402,6 @@ public class RealtimeManagerTest chiefStartedLatch.await(); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { - plumber.setRunner(runner); GroupByQuery query = GroupByQuery .builder() .setDataSource(QueryRunnerTestHelper.dataSource) @@ -421,6 +415,8 @@ public class RealtimeManagerTest ) .setGranularity(QueryRunnerTestHelper.dayGran) .build(); + plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); + plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); Iterable results = GroupByQueryRunnerTestHelper.runQuery( factory, @@ -436,7 +432,7 @@ public class RealtimeManagerTest } - @Test(timeout = 5_000L) + @Test(timeout = 10_000L) public void testQueryWithSegmentSpec() throws IOException, InterruptedException { List expectedResults = Arrays.asList( @@ -464,7 +460,6 @@ public class RealtimeManagerTest chiefStartedLatch.await(); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { - plumber.setRunner(runner); GroupByQuery query = GroupByQuery .builder() .setDataSource(QueryRunnerTestHelper.dataSource) @@ -478,6 +473,8 @@ public class RealtimeManagerTest ) .setGranularity(QueryRunnerTestHelper.dayGran) .build(); + plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); + plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); Iterable results = GroupByQueryRunnerTestHelper.runQuery( factory, @@ -512,7 +509,151 @@ public class RealtimeManagerTest } - private GroupByQueryRunnerFactory initFactory() + @Test(timeout = 10_000L) + public void testQueryWithMultipleSegmentSpec() throws IOException, InterruptedException + { + + List expectedResults_both_partitions = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 2L, "idx", 260L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 2L, "idx", 236L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 4L, "idx", 4556L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 2L, "idx", 284L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 2L, "idx", 202L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 2L, "idx", 288L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 2L, "idx", 326L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 2L, "idx", 312L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 2L, "idx", 248L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 2L, "idx", 326L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 2L, "idx", 262L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 6L, "idx", 5126L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 2L, "idx", 254L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 6L, "idx", 5276L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 2L, "idx", 206L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 2L, "idx", 260L) + ); + + List expectedResults_single_partition_26_28 = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 1L, "idx", 130L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 2L, "idx", 2278L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 1L, "idx", 142L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 1L, "idx", 101L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 1L, "idx", 144L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 1L, "idx", 163L) + ); + + List expectedResults_single_partition_28_29 = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 1L, "idx", 156L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 1L, "idx", 124L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 1L, "idx", 163L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 1L, "idx", 131L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 3L, "idx", 2563L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 1L, "idx", 127L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 3L, "idx", 2638L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 1L, "idx", 103L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L) + ); + + chiefStartedLatch.await(); + + final Interval interval_26_28 = new Interval("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z"); + final Interval interval_28_29 = new Interval("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z"); + final SegmentDescriptor descriptor_26_28_0 = new SegmentDescriptor(interval_26_28, "ver0", 0); + final SegmentDescriptor descriptor_28_29_0 = new SegmentDescriptor(interval_28_29, "ver1", 0); + final SegmentDescriptor descriptor_26_28_1 = new SegmentDescriptor(interval_26_28, "ver0", 1); + final SegmentDescriptor descriptor_28_29_1 = new SegmentDescriptor(interval_28_29, "ver1", 1); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + descriptor_26_28_0, + descriptor_28_29_0, + descriptor_26_28_1, + descriptor_28_29_1 + ))) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final Map runnerMap = ImmutableMap.of( + interval_26_28, + QueryRunnerTestHelper.makeQueryRunner( + factory, + "druid.sample.tsv.top" + ) + , + interval_28_29, + QueryRunnerTestHelper.makeQueryRunner( + factory, + "druid.sample.tsv.bottom" + ) + ); + plumber.setRunners(runnerMap); + plumber2.setRunners(runnerMap); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery( + factory, + query.getQuerySegmentSpec().lookup(query, realtimeManager3), + query + ); + TestHelper.assertExpectedObjects(expectedResults_both_partitions, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_26_28_0) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_28_29_0) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_26_28_1) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_28_29_1) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, ""); + + } + + private static GroupByQueryRunnerFactory initFactory() { final ObjectMapper mapper = new DefaultObjectMapper(); final StupidPool pool = new StupidPool<>( @@ -773,7 +914,7 @@ public class RealtimeManagerTest private volatile boolean finishedJob = false; private volatile int persistCount = 0; - private QueryRunner runner; + private Map runners; private TestPlumber(Sink sink) { @@ -826,13 +967,45 @@ public class RealtimeManagerTest return null; } + @SuppressWarnings("unchecked") @Override - public QueryRunner getQueryRunner(Query query) + public QueryRunner getQueryRunner(final Query query) { - if (runner == null) { + if (runners == null) { throw new UnsupportedOperationException(); } - return runner; + + final BaseQuery baseQuery = (BaseQuery) query; + + if (baseQuery.getQuerySegmentSpec() instanceof MultipleIntervalSegmentSpec) { + return factory.getToolchest() + .mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Iterables.transform( + baseQuery.getIntervals(), + new Function>() + { + @Override + public QueryRunner apply(Interval input) + { + return runners.get(input); + } + } + ) + ) + ); + } + + Assert.assertEquals(1, query.getIntervals().size()); + + final SegmentDescriptor descriptor = + ((SpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptor(); + + return new SpecificSegmentQueryRunner( + runners.get(descriptor.getInterval()), + new SpecificSegmentSpec(descriptor) + ); } @Override @@ -847,9 +1020,9 @@ public class RealtimeManagerTest finishedJob = true; } - public void setRunner(QueryRunner runner) + public void setRunners(Map runners) { - this.runner = runner; + this.runners = runners; } }