Merge pull request #2583 from guobingkun/fix_multiple_specs_2

update querySegmentSpec when passing query to getQueryRunner
This commit is contained in:
Fangjin Yang 2016-03-02 18:05:34 -08:00
commit d06c1c5c85
4 changed files with 231 additions and 43 deletions

View File

@ -400,6 +400,18 @@ public class QueryRunnerTestHelper
}; };
} }
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
String resourceFileName
)
{
return makeQueryRunner(
factory,
segmentId,
new IncrementalIndexSegment(TestIndex.makeRealtimeIndex(resourceFileName), segmentId)
);
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner( public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory, QueryRunnerFactory<T, QueryType> factory,
Segment adapter Segment adapter

View File

@ -163,7 +163,7 @@ public class TestIndex
} }
} }
private static IncrementalIndex makeRealtimeIndex(final String resourceFilename) public static IncrementalIndex makeRealtimeIndex(final String resourceFilename)
{ {
final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename); final URL resource = TestIndex.class.getClassLoader().getResource(resourceFilename);
if (resource == null) { if (resource == null) {

View File

@ -44,6 +44,7 @@ import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QuerySegmentWalker; import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest; import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor; import io.druid.query.SegmentDescriptor;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Committers;
@ -192,7 +193,9 @@ public class RealtimeManager implements QuerySegmentWalker
public QueryRunner<T> apply(SegmentDescriptor spec) public QueryRunner<T> apply(SegmentDescriptor spec)
{ {
final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber()); final FireChief retVal = partitionChiefs.get(spec.getPartitionNumber());
return retVal == null ? new NoopQueryRunner<T>() : retVal.getQueryRunner(query); return retVal == null
? new NoopQueryRunner<T>()
: retVal.getQueryRunner(query.withQuerySegmentSpec(new SpecificSegmentSpec(spec)));
} }
} }
) )

View File

@ -20,12 +20,15 @@
package io.druid.segment.realtime; package io.druid.segment.realtime;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.Granularity; import com.metamx.common.Granularity;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.parsers.ParseException; import com.metamx.common.parsers.ParseException;
@ -40,6 +43,7 @@ import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser; import io.druid.data.input.impl.InputRowParser;
import io.druid.granularity.QueryGranularity; import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.BaseQuery;
import io.druid.query.Query; import io.druid.query.Query;
import io.druid.query.QueryRunner; import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactory;
@ -58,6 +62,10 @@ import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest; import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.GroupByQueryRunnerFactory; import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.GroupByQueryRunnerTestHelper; import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.query.spec.SpecificSegmentQueryRunner;
import io.druid.query.spec.SpecificSegmentSpec;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.DataSchema;
@ -75,6 +83,7 @@ import org.joda.time.Period;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
@ -90,6 +99,8 @@ import java.util.concurrent.TimeUnit;
*/ */
public class RealtimeManagerTest public class RealtimeManagerTest
{ {
private static QueryRunnerFactory factory;
private RealtimeManager realtimeManager; private RealtimeManager realtimeManager;
private RealtimeManager realtimeManager2; private RealtimeManager realtimeManager2;
private RealtimeManager realtimeManager3; private RealtimeManager realtimeManager3;
@ -97,8 +108,16 @@ public class RealtimeManagerTest
private DataSchema schema2; private DataSchema schema2;
private TestPlumber plumber; private TestPlumber plumber;
private TestPlumber plumber2; private TestPlumber plumber2;
private QueryRunnerFactory factory;
private CountDownLatch chiefStartedLatch; private CountDownLatch chiefStartedLatch;
private RealtimeTuningConfig tuningConfig_0;
private RealtimeTuningConfig tuningConfig_1;
private DataSchema schema3;
@BeforeClass
public static void setupStatic()
{
factory = initFactory();
}
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -208,31 +227,7 @@ public class RealtimeManagerTest
null null
); );
factory = initFactory(); tuningConfig_0 = new RealtimeTuningConfig(
RealtimeIOConfig ioConfig3 = new RealtimeIOConfig(
new FirehoseFactory()
{
@Override
public Firehose connect(InputRowParser parser) throws IOException
{
return new InfiniteTestFirehose();
}
},
new PlumberSchool()
{
@Override
public Plumber findPlumber(
DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics
)
{
return plumber;
}
},
null
);
RealtimeTuningConfig tuningConfig_0 = new RealtimeTuningConfig(
1, 1,
new Period("P1Y"), new Period("P1Y"),
null, null,
@ -248,7 +243,7 @@ public class RealtimeManagerTest
null null
); );
RealtimeTuningConfig tuningConfig_1 = new RealtimeTuningConfig( tuningConfig_1 = new RealtimeTuningConfig(
1, 1,
new Period("P1Y"), new Period("P1Y"),
null, null,
@ -264,7 +259,7 @@ public class RealtimeManagerTest
null null
); );
DataSchema schema2 = new DataSchema( schema3 = new DataSchema(
"testing", "testing",
null, null,
new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, new AggregatorFactory[]{new CountAggregatorFactory("ignore")},
@ -272,8 +267,8 @@ public class RealtimeManagerTest
jsonMapper jsonMapper
); );
FireDepartment department_0 = new FireDepartment(schema2, ioConfig3, tuningConfig_0); FireDepartment department_0 = new FireDepartment(schema3, ioConfig, tuningConfig_0);
FireDepartment department_1 = new FireDepartment(schema2, ioConfig3, tuningConfig_1); FireDepartment department_1 = new FireDepartment(schema3, ioConfig2, tuningConfig_1);
QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate() QueryRunnerFactoryConglomerate conglomerate = new QueryRunnerFactoryConglomerate()
{ {
@ -379,7 +374,7 @@ public class RealtimeManagerTest
Assert.assertEquals(0, plumber2.getPersistCount()); Assert.assertEquals(0, plumber2.getPersistCount());
} }
@Test(timeout = 5_000L) @Test(timeout = 10_000L)
public void testQueryWithInterval() throws IOException, InterruptedException public void testQueryWithInterval() throws IOException, InterruptedException
{ {
List<Row> expectedResults = Arrays.asList( List<Row> expectedResults = Arrays.asList(
@ -407,7 +402,6 @@ public class RealtimeManagerTest
chiefStartedLatch.await(); chiefStartedLatch.await();
for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
plumber.setRunner(runner);
GroupByQuery query = GroupByQuery GroupByQuery query = GroupByQuery
.builder() .builder()
.setDataSource(QueryRunnerTestHelper.dataSource) .setDataSource(QueryRunnerTestHelper.dataSource)
@ -421,6 +415,8 @@ public class RealtimeManagerTest
) )
.setGranularity(QueryRunnerTestHelper.dayGran) .setGranularity(QueryRunnerTestHelper.dayGran)
.build(); .build();
plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner));
plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner));
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery( Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
factory, factory,
@ -436,7 +432,7 @@ public class RealtimeManagerTest
} }
@Test(timeout = 5_000L) @Test(timeout = 10_000L)
public void testQueryWithSegmentSpec() throws IOException, InterruptedException public void testQueryWithSegmentSpec() throws IOException, InterruptedException
{ {
List<Row> expectedResults = Arrays.asList( List<Row> expectedResults = Arrays.asList(
@ -464,7 +460,6 @@ public class RealtimeManagerTest
chiefStartedLatch.await(); chiefStartedLatch.await();
for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) { for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory) factory)) {
plumber.setRunner(runner);
GroupByQuery query = GroupByQuery GroupByQuery query = GroupByQuery
.builder() .builder()
.setDataSource(QueryRunnerTestHelper.dataSource) .setDataSource(QueryRunnerTestHelper.dataSource)
@ -478,6 +473,8 @@ public class RealtimeManagerTest
) )
.setGranularity(QueryRunnerTestHelper.dayGran) .setGranularity(QueryRunnerTestHelper.dayGran)
.build(); .build();
plumber.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner));
plumber2.setRunners(ImmutableMap.of(query.getIntervals().get(0), runner));
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery( Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
factory, factory,
@ -512,7 +509,151 @@ public class RealtimeManagerTest
} }
private GroupByQueryRunnerFactory initFactory() @Test(timeout = 10_000L)
public void testQueryWithMultipleSegmentSpec() throws IOException, InterruptedException
{
List<Row> expectedResults_both_partitions = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 2L, "idx", 260L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 2L, "idx", 236L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 4L, "idx", 4556L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 2L, "idx", 284L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 2L, "idx", 202L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 2L, "idx", 288L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 2L, "idx", 326L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 2L, "idx", 312L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 2L, "idx", 248L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 2L, "idx", 326L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 2L, "idx", 262L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 6L, "idx", 5126L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 2L, "idx", 254L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 6L, "idx", 5276L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 2L, "idx", 206L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 2L, "idx", 260L)
);
List<Row> expectedResults_single_partition_26_28 = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 1L, "idx", 130L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 2L, "idx", 2278L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 1L, "idx", 142L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 1L, "idx", 101L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 1L, "idx", 144L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 1L, "idx", 163L)
);
List<Row> expectedResults_single_partition_28_29 = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 1L, "idx", 156L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 1L, "idx", 124L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 1L, "idx", 163L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 1L, "idx", 131L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 3L, "idx", 2563L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 1L, "idx", 127L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 3L, "idx", 2638L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 1L, "idx", 103L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L)
);
chiefStartedLatch.await();
final Interval interval_26_28 = new Interval("2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z");
final Interval interval_28_29 = new Interval("2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z");
final SegmentDescriptor descriptor_26_28_0 = new SegmentDescriptor(interval_26_28, "ver0", 0);
final SegmentDescriptor descriptor_28_29_0 = new SegmentDescriptor(interval_28_29, "ver1", 0);
final SegmentDescriptor descriptor_26_28_1 = new SegmentDescriptor(interval_26_28, "ver0", 1);
final SegmentDescriptor descriptor_28_29_1 = new SegmentDescriptor(interval_28_29, "ver1", 1);
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
ImmutableList.<SegmentDescriptor>of(
descriptor_26_28_0,
descriptor_28_29_0,
descriptor_26_28_1,
descriptor_28_29_1
)))
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
final Map<Interval, QueryRunner> runnerMap = ImmutableMap.<Interval, QueryRunner>of(
interval_26_28,
QueryRunnerTestHelper.makeQueryRunner(
factory,
"druid.sample.tsv.top"
)
,
interval_28_29,
QueryRunnerTestHelper.makeQueryRunner(
factory,
"druid.sample.tsv.bottom"
)
);
plumber.setRunners(runnerMap);
plumber2.setRunners(runnerMap);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
factory,
query.getQuerySegmentSpec().lookup(query, realtimeManager3),
query
);
TestHelper.assertExpectedObjects(expectedResults_both_partitions, results, "");
results = GroupByQueryRunnerTestHelper.runQuery(
factory,
realtimeManager3.getQueryRunnerForSegments(
query,
ImmutableList.<SegmentDescriptor>of(
descriptor_26_28_0)
),
query
);
TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, "");
results = GroupByQueryRunnerTestHelper.runQuery(
factory,
realtimeManager3.getQueryRunnerForSegments(
query,
ImmutableList.<SegmentDescriptor>of(
descriptor_28_29_0)
),
query
);
TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, "");
results = GroupByQueryRunnerTestHelper.runQuery(
factory,
realtimeManager3.getQueryRunnerForSegments(
query,
ImmutableList.<SegmentDescriptor>of(
descriptor_26_28_1)
),
query
);
TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, "");
results = GroupByQueryRunnerTestHelper.runQuery(
factory,
realtimeManager3.getQueryRunnerForSegments(
query,
ImmutableList.<SegmentDescriptor>of(
descriptor_28_29_1)
),
query
);
TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, "");
}
private static GroupByQueryRunnerFactory initFactory()
{ {
final ObjectMapper mapper = new DefaultObjectMapper(); final ObjectMapper mapper = new DefaultObjectMapper();
final StupidPool<ByteBuffer> pool = new StupidPool<>( final StupidPool<ByteBuffer> pool = new StupidPool<>(
@ -773,7 +914,7 @@ public class RealtimeManagerTest
private volatile boolean finishedJob = false; private volatile boolean finishedJob = false;
private volatile int persistCount = 0; private volatile int persistCount = 0;
private QueryRunner runner; private Map<Interval, QueryRunner> runners;
private TestPlumber(Sink sink) private TestPlumber(Sink sink)
{ {
@ -826,13 +967,45 @@ public class RealtimeManagerTest
return null; return null;
} }
@SuppressWarnings("unchecked")
@Override @Override
public <T> QueryRunner<T> getQueryRunner(Query<T> query) public <T> QueryRunner<T> getQueryRunner(final Query<T> query)
{ {
if (runner == null) { if (runners == null) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
return runner;
final BaseQuery baseQuery = (BaseQuery) query;
if (baseQuery.getQuerySegmentSpec() instanceof MultipleIntervalSegmentSpec) {
return factory.getToolchest()
.mergeResults(
factory.mergeRunners(
MoreExecutors.sameThreadExecutor(),
Iterables.transform(
baseQuery.getIntervals(),
new Function<Interval, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(Interval input)
{
return runners.get(input);
}
}
)
)
);
}
Assert.assertEquals(1, query.getIntervals().size());
final SegmentDescriptor descriptor =
((SpecificSegmentSpec) ((BaseQuery) query).getQuerySegmentSpec()).getDescriptor();
return new SpecificSegmentQueryRunner<T>(
runners.get(descriptor.getInterval()),
new SpecificSegmentSpec(descriptor)
);
} }
@Override @Override
@ -847,9 +1020,9 @@ public class RealtimeManagerTest
finishedJob = true; finishedJob = true;
} }
public void setRunner(QueryRunner runner) public void setRunners(Map<Interval, QueryRunner> runners)
{ {
this.runner = runner; this.runners = runners;
} }
} }