From 4a58462fc7674f50a580debc82bba81992575f7c Mon Sep 17 00:00:00 2001 From: Bingkun Guo Date: Wed, 2 Mar 2016 02:17:33 -0600 Subject: [PATCH] update querySegmentSpec when passing query to getQueryRunner After finding the FireChief for a specific partition, Druid will need to find the specific queryRunner for each segment being queried by passing the query to FireChief. Currently Druid is passing the original query that contains all the segments need to be queried, it's possible that fireChief.getQueryRunner(query) returns more than 1 queryRunner because query.getIntervals() is not specific to a single segment. In this patch, for each segment being queried, Druid will update the query with its corresponding SpecificSegmentSpec. --- .../io/druid/query/QueryRunnerTestHelper.java | 12 + .../test/java/io/druid/segment/TestIndex.java | 2 +- .../segment/realtime/RealtimeManager.java | 5 +- .../segment/realtime/RealtimeManagerTest.java | 255 +++++++++++++++--- 4 files changed, 231 insertions(+), 43 deletions(-) diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 84357cb13ab..8f1d70f49fa 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -400,6 +400,18 @@ public class QueryRunnerTestHelper }; } + public static > QueryRunner makeQueryRunner( + QueryRunnerFactory factory, + String resourceFileName + ) + { + return makeQueryRunner( + factory, + segmentId, + new IncrementalIndexSegment(TestIndex.makeRealtimeIndex(resourceFileName), segmentId) + ); + } + public static > QueryRunner makeQueryRunner( QueryRunnerFactory factory, Segment adapter diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index 7aa22aa238e..fb82f837771 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -163,7 +163,7 @@ public class TestIndex } } - private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) + public static IncrementalIndex makeRealtimeIndex(final String resourceFilename) { final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); if (resource == null) { diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 85d59b4b53f..5cdd6e33c97 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -44,6 +44,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.QueryToolChest; import io.druid.query.SegmentDescriptor; +import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.plumber.Committers; @@ -192,7 +193,9 @@ public class RealtimeManager implements QuerySegmentWalker public QueryRunner apply(SegmentDescriptor spec) { final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber()); - return retVal == null ? new NoopQueryRunner() : retVal.getQueryRunner(query); + return retVal == null + ? new NoopQueryRunner() + : retVal.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(spec))); } } ) diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 48e60398a12..2d0bcac50ed 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -20,12 +20,15 @@ package io.druid.segment.realtime; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.base.Stopwatch; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; @@ -40,6 +43,7 @@ import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; import io.druid.granularity.QueryGranularity; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.BaseQuery; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; @@ -58,6 +62,10 @@ import io.druid.query.groupby.GroupByQueryEngine; import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.query.groupby.GroupByQueryRunnerTestHelper; +import io.druid.query.spec.MultipleIntervalSegmentSpec; +import io.druid.query.spec.MultipleSpecificSegmentSpec; +import io.druid.query.spec.SpecificSegmentQueryRunner; +import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; @@ -75,6 +83,7 @@ import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import java.io.IOException; @@ -90,6 +99,8 @@ import java.util.concurrent.TimeUnit; */ public class RealtimeManagerTest { + private static QueryRunnerFactory factory; + private RealtimeManager realtimeManager; private RealtimeManager realtimeManager2; private RealtimeManager realtimeManager3; @@ -97,8 +108,16 @@ public class RealtimeManagerTest private DataSchema schema2; private TestPlumber plumber; private TestPlumber plumber2; - private QueryRunnerFactory factory; private CountDownLatch chiefStartedLatch; + private RealtimeTuningConfig tuningConfig_0; + private RealtimeTuningConfig tuningConfig_1; + private DataSchema schema3; + + @BeforeClass + public static void setupStatic() + { + factory = initFactory(); + } @Before public void setUp() throws Exception @@ -208,31 +227,7 @@ public class RealtimeManagerTest null ); - factory = initFactory(); - - RealtimeIOConfig ioConfig3 = new RealtimeIOConfig( - new FirehoseFactory() - { - @Override - public Firehose connect(InputRowParser parser) throws IOException - { - return new InfiniteTestFirehose(); - } - }, - new PlumberSchool() - { - @Override - public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics - ) - { - return plumber; - } - }, - null - ); - - RealtimeTuningConfig tuningConfig_0 = new RealtimeTuningConfig( + tuningConfig_0 = new RealtimeTuningConfig( 1, new Period("P1Y"), null, @@ -248,7 +243,7 @@ public class RealtimeManagerTest null ); - RealtimeTuningConfig tuningConfig_1 = new RealtimeTuningConfig( + tuningConfig_1 = new RealtimeTuningConfig( 1, new Period("P1Y"), null, @@ -264,7 +259,7 @@ public class RealtimeManagerTest null ); - DataSchema schema2 = new DataSchema( + schema3 = new DataSchema( "testing", null, new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, @@ -272,8 +267,8 @@ public class RealtimeManagerTest jsonMapper ); - FireDepartment department_0 = new FireDepartment(schema2, ioConfig3, tuningConfig_0); - FireDepartment department_1 = new FireDepartment(schema2, ioConfig3, tuningConfig_1); + FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0); + FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1); QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate() { @@ -379,7 +374,7 @@ public class RealtimeManagerTest Assert.assertEquals(0, plumber2.getPersistCount()); } - @Test(timeout = 5_000L) + @Test(timeout = 10_000L) public void testQueryWithInterval() throws IOException, InterruptedException { List expectedResults = Arrays.asList( @@ -407,7 +402,6 @@ public class RealtimeManagerTest chiefStartedLatch.await(); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { - plumber.setRunner(runner); GroupByQuery query = GroupByQuery .builder() .setDataSource(QueryRunnerTestHelper.dataSource) @@ -421,6 +415,8 @@ public class RealtimeManagerTest ) .setGranularity(QueryRunnerTestHelper.dayGran) .build(); + plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); + plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); Iterable results = GroupByQueryRunnerTestHelper.runQuery( factory, @@ -436,7 +432,7 @@ public class RealtimeManagerTest } - @Test(timeout = 5_000L) + @Test(timeout = 10_000L) public void testQueryWithSegmentSpec() throws IOException, InterruptedException { List expectedResults = Arrays.asList( @@ -464,7 +460,6 @@ public class RealtimeManagerTest chiefStartedLatch.await(); for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { - plumber.setRunner(runner); GroupByQuery query = GroupByQuery .builder() .setDataSource(QueryRunnerTestHelper.dataSource) @@ -478,6 +473,8 @@ public class RealtimeManagerTest ) .setGranularity(QueryRunnerTestHelper.dayGran) .build(); + plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); + plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner)); Iterable results = GroupByQueryRunnerTestHelper.runQuery( factory, @@ -512,7 +509,151 @@ public class RealtimeManagerTest } - private GroupByQueryRunnerFactory initFactory() + @Test(timeout = 10_000L) + public void testQueryWithMultipleSegmentSpec() throws IOException, InterruptedException + { + + List expectedResults_both_partitions = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 2L, "idx", 260L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 2L, "idx", 236L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 4L, "idx", 4556L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 2L, "idx", 284L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 2L, "idx", 202L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 2L, "idx", 288L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 2L, "idx", 326L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 2L, "idx", 312L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 2L, "idx", 248L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 2L, "idx", 326L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 2L, "idx", 262L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 6L, "idx", 5126L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 2L, "idx", 254L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 6L, "idx", 5276L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 2L, "idx", 206L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 2L, "idx", 260L) + ); + + List expectedResults_single_partition_26_28 = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 1L, "idx", 130L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 1L, "idx", 118L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 2L, "idx", 2278L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 1L, "idx", 142L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 1L, "idx", 101L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 1L, "idx", 144L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 1L, "idx", 163L) + ); + + List expectedResults_single_partition_28_29 = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 1L, "idx", 156L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 1L, "idx", 124L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 1L, "idx", 163L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 1L, "idx", 131L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 3L, "idx", 2563L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 1L, "idx", 127L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 3L, "idx", 2638L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 1L, "idx", 103L), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L) + ); + + chiefStartedLatch.await(); + + final Interval interval_26_28 = new Interval("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z"); + final Interval interval_28_29 = new Interval("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z"); + final SegmentDescriptor descriptor_26_28_0 = new SegmentDescriptor(interval_26_28, "ver0", 0); + final SegmentDescriptor descriptor_28_29_0 = new SegmentDescriptor(interval_28_29, "ver1", 0); + final SegmentDescriptor descriptor_26_28_1 = new SegmentDescriptor(interval_26_28, "ver0", 1); + final SegmentDescriptor descriptor_28_29_1 = new SegmentDescriptor(interval_28_29, "ver1", 1); + + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec( + new MultipleSpecificSegmentSpec( + ImmutableList.of( + descriptor_26_28_0, + descriptor_28_29_0, + descriptor_26_28_1, + descriptor_28_29_1 + ))) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("quality", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new LongSumAggregatorFactory("idx", "index") + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + final Map runnerMap = ImmutableMap.of( + interval_26_28, + QueryRunnerTestHelper.makeQueryRunner( + factory, + "druid.sample.tsv.top" + ) + , + interval_28_29, + QueryRunnerTestHelper.makeQueryRunner( + factory, + "druid.sample.tsv.bottom" + ) + ); + plumber.setRunners(runnerMap); + plumber2.setRunners(runnerMap); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery( + factory, + query.getQuerySegmentSpec().lookup(query, realtimeManager3), + query + ); + TestHelper.assertExpectedObjects(expectedResults_both_partitions, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_26_28_0) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_28_29_0) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_26_28_1) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, ""); + + results = GroupByQueryRunnerTestHelper.runQuery( + factory, + realtimeManager3.getQueryRunnerForSegments( + query, + ImmutableList.of( + descriptor_28_29_1) + ), + query + ); + TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, ""); + + } + + private static GroupByQueryRunnerFactory initFactory() { final ObjectMapper mapper = new DefaultObjectMapper(); final StupidPool pool = new StupidPool<>( @@ -773,7 +914,7 @@ public class RealtimeManagerTest private volatile boolean finishedJob = false; private volatile int persistCount = 0; - private QueryRunner runner; + private Map runners; private TestPlumber(Sink sink) { @@ -826,13 +967,45 @@ public class RealtimeManagerTest return null; } + @SuppressWarnings("unchecked") @Override - public QueryRunner getQueryRunner(Query query) + public QueryRunner getQueryRunner(final Query query) { - if (runner == null) { + if (runners == null) { throw new UnsupportedOperationException(); } - return runner; + + final BaseQuery baseQuery = (BaseQuery) query; + + if (baseQuery.getQuerySegmentSpec() instanceof MultipleIntervalSegmentSpec) { + return factory.getToolchest() + .mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + Iterables.transform( + baseQuery.getIntervals(), + new Function>() + { + @Override + public QueryRunner apply(Interval input) + { + return runners.get(input); + } + } + ) + ) + ); + } + + Assert.assertEquals(1, query.getIntervals().size()); + + final SegmentDescriptor descriptor = + ((SpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptor(); + + return new SpecificSegmentQueryRunner( + runners.get(descriptor.getInterval()), + new SpecificSegmentSpec(descriptor) + ); } @Override @@ -847,9 +1020,9 @@ public class RealtimeManagerTest finishedJob = true; } - public void setRunner(QueryRunner runner) + public void setRunners(Map runners) { - this.runner = runner; + this.runners = runners; } }