Broker: Add ability to inline subqueries. (#9533)

* Broker: Add ability to inline subqueries.

The main changes:

- ClientQuerySegmentWalker: Add ability to inline queries.
- Query: Add "getSubQueryId" and "withSubQueryId" methods.
- QueryMetrics: Add "subQueryId" dimension.
- ServerConfig: Add new "maxSubqueryRows" parameter, which is used by
  ClientQuerySegmentWalker to limit how many rows can be inlined per
  query.
- IndexedTableJoinMatcher: Allow creating keys on top of unknown types,
  by assuming they are strings. This is useful because not all types are
  known for fields in query results.
- InlineDataSource: Store RowSignature rather than component parts. Add
  more zealous "equals" and "hashCode" methods to ease testing.
- Moved QuerySegmentWalker test code from CalciteTests and
  SpecificSegmentsQueryWalker in druid-sql to QueryStackTests in
  druid-server. Use this to spin up a new ClientQuerySegmentWalkerTest.

* Adjustments from CI.

* Fix integration test.
This commit is contained in:
Gian Merlino 2020-03-18 15:06:45 -07:00 committed by GitHub
parent 4c620b8f1c
commit 1ef25a438f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 2103 additions and 731 deletions

View File

@ -26,13 +26,13 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.NoopEscalator;
@ -183,20 +183,19 @@ public class SqlBenchmark
log.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerate = CalciteTests.createQueryRunnerFactoryConglomerate();
closer.register(conglomerate.rhs);
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
final SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate.lhs).add(
final SpecificSegmentsQuerySegmentWalker walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(
dataSegment,
index
);
closer.register(walker);
final SchemaPlus rootSchema =
CalciteTests.createMockRootSchema(conglomerate.lhs, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
plannerFactory = new PlannerFactory(
rootSchema,
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate.lhs),
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
CalciteTests.createOperatorTable(),
CalciteTests.createExprMacroTable(),
plannerConfig,

View File

@ -26,7 +26,6 @@ import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
@ -39,6 +38,7 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.NoopEscalator;
@ -110,9 +110,7 @@ public class SqlVsNativeBenchmark
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs;
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
final PlannerConfig plannerConfig = new PlannerConfig();
this.walker = closer.register(new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index));

View File

@ -107,6 +107,11 @@
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>

View File

@ -39,6 +39,7 @@ import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -198,6 +199,19 @@ public class MaterializedViewQuery<T> implements Query<T>
return query.getId();
}
@Override
public Query<T> withSubQueryId(String subQueryId)
{
return new MaterializedViewQuery<>(query.withSubQueryId(subQueryId), optimizer);
}
@Nullable
@Override
public String getSubQueryId()
{
return query.getSubQueryId();
}
@Override
public MaterializedViewQuery withDataSource(DataSource dataSource)
{

View File

@ -25,7 +25,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -45,6 +44,7 @@ import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -88,10 +88,8 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass
@ -457,7 +455,8 @@ public class TDigestSketchSqlAggregatorTest extends CalciteTestBase
),
new TDigestSketchAggregatorFactory("a2:agg", "m1",
300
)))
)
))
.postAggregators(
new TDigestSketchToQuantilePostAggregator("a0", makeFieldAccessPostAgg("a0:agg"), 0.0f),
new TDigestSketchToQuantilePostAggregator("a1", makeFieldAccessPostAgg("a1:agg"), 0.5f),

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
@ -59,6 +58,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -105,10 +105,8 @@ public class HllSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
@ -63,6 +62,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -105,10 +105,8 @@ public class DoublesSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
@ -56,6 +55,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -101,10 +101,8 @@ public class ThetaSketchSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -41,7 +41,6 @@ import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.BloomFilterSerializersModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -64,6 +63,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -124,10 +124,8 @@ public class BloomFilterSqlAggregatorTest extends InitializedNullHandlingTest
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -54,6 +53,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -96,10 +96,8 @@ public class FixedBucketsHistogramQuantileSqlAggregatorTest extends CalciteTestB
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -53,6 +52,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -95,10 +95,8 @@ public class QuantileSqlAggregatorTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -34,7 +34,6 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Druids;
@ -50,6 +49,7 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.sql.SqlLifecycle;
@ -89,10 +89,8 @@ public class VarianceSqlAggregatorTest extends InitializedNullHandlingTest
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -1779,5 +1779,157 @@
}
}
]
},
{
"description": "groupBy on topN subquery on inline",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "topN",
"dataSource": {
"type": "inline",
"columnNames": [
"k",
"v"
],
"columnTypes": [
"string",
"string"
],
"rows": [
[
"Wikipedia:Vandalismusmeldung",
"inline!"
]
]
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimension": {
"type": "default",
"dimension": "k",
"outputName": "d"
},
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"metric": "rows",
"threshold": 3,
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
}
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimensions": [
"d",
{
"type": "default",
"dimension": "rows",
"outputName": "rows",
"outputType": "long"
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"d": "Wikipedia:Vandalismusmeldung",
"rows": 1
}
}
]
},
{
"description": "groupBy on topN subquery on table",
"query": {
"queryType": "groupBy",
"dataSource": {
"type": "query",
"query": {
"queryType": "topN",
"dataSource": "wikipedia_editstream",
"intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"],
"granularity": "all",
"aggregations": [
{
"type": "count",
"name": "rows"
}
],
"dimension": {"type": "default", "dimension": "page", "outputName": "d"},
"metric": "rows",
"threshold": 3,
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
}
},
"intervals": [
"0000-01-01T00:00:00.000/3000-01-01T00:00:00.000"
],
"granularity": "all",
"dimensions": [
"d",
{
"type": "default",
"dimension": "rows",
"outputName": "rows",
"outputType": "long"
}
],
"context": {
"useCache": "true",
"populateCache": "true",
"timeout": 360000
}
},
"expectedResults": [
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"d": "Wikipedia:Administrator_intervention_against_vandalism",
"rows": 800
}
},
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"d": "Wikipedia:Administrators'_noticeboard/Incidents",
"rows": 990
}
},
{
"version": "v1",
"timestamp": "0000-01-01T00:00:00.000Z",
"event": {
"d": "Wikipedia:Vandalismusmeldung",
"rows": 991
}
}
]
}
]

View File

@ -54,6 +54,7 @@ public abstract class BaseQuery<T> implements Query<T>
}
public static final String QUERY_ID = "queryId";
public static final String SUB_QUERY_ID = "subQueryId";
public static final String SQL_QUERY_ID = "sqlQueryId";
private final DataSource dataSource;
private final boolean descending;
@ -232,6 +233,19 @@ public abstract class BaseQuery<T> implements Query<T>
return (String) getContextValue(QUERY_ID);
}
@Override
public Query<T> withSubQueryId(String subQueryId)
{
return withOverriddenContext(ImmutableMap.of(SUB_QUERY_ID, subQueryId));
}
@Nullable
@Override
public String getSubQueryId()
{
return (String) getContextValue(SUB_QUERY_ID);
}
@Override
public Query withId(String id)
{

View File

@ -87,6 +87,7 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
hasFilters(query);
duration(query);
queryId(query);
subQueryId(query);
sqlQueryId(query);
}
@ -130,6 +131,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
setDimension(DruidMetrics.ID, StringUtils.nullToEmptyNonDruidDataString(query.getId()));
}
@Override
public void subQueryId(QueryType query)
{
// Emit nothing by default.
}
@Override
public void sqlQueryId(QueryType query)
{

View File

@ -24,6 +24,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class FluentQueryRunnerBuilder<T>
{
@ -96,5 +97,10 @@ public class FluentQueryRunnerBuilder<T>
{
return from(toolChest.mergeResults(baseRunner));
}
public FluentQueryRunner map(final Function<QueryRunner<T>, QueryRunner<T>> mapFn)
{
return from(mapFn.apply(baseRunner));
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -30,12 +31,15 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Represents an inline datasource, where the rows are embedded within the DataSource object itself.
@ -45,23 +49,16 @@ import java.util.function.ToLongFunction;
*/
public class InlineDataSource implements DataSource
{
private final List<String> columnNames;
private final List<ValueType> columnTypes;
private final Iterable<Object[]> rows;
private final RowSignature signature;
private InlineDataSource(
final List<String> columnNames,
final List<ValueType> columnTypes,
final Iterable<Object[]> rows
final Iterable<Object[]> rows,
final RowSignature signature
)
{
this.columnNames = Preconditions.checkNotNull(columnNames, "'columnNames' must be nonnull");
this.columnTypes = Preconditions.checkNotNull(columnTypes, "'columnTypes' must be nonnull");
this.rows = Preconditions.checkNotNull(rows, "'rows' must be nonnull");
if (columnNames.size() != columnTypes.size()) {
throw new IAE("columnNames and columnTypes must be the same length");
}
this.signature = Preconditions.checkNotNull(signature, "'signature' must be nonnull");
}
/**
@ -75,24 +72,36 @@ public class InlineDataSource implements DataSource
@JsonProperty("rows") List<Object[]> rows
)
{
return new InlineDataSource(columnNames, columnTypes, rows);
Preconditions.checkNotNull(columnNames, "'columnNames' must be nonnull");
if (columnTypes != null && columnNames.size() != columnTypes.size()) {
throw new IAE("columnNames and columnTypes must be the same length");
}
final RowSignature.Builder builder = RowSignature.builder();
for (int i = 0; i < columnNames.size(); i++) {
final String name = columnNames.get(i);
final ValueType type = columnTypes != null ? columnTypes.get(i) : null;
builder.add(name, type);
}
return new InlineDataSource(rows, builder.build());
}
/**
* Creates an inline datasource from an Iterable. The Iterable will not be iterated until someone calls
* {@link #getRows()} and iterates the result, or until someone calls {@link #getRowsAsList()}.
*
* @param columnNames names of each column in the rows
* @param columnTypes types of each column in the rows
* @param rows rows, each of the same length as columnNames and columnTypes
* @param rows rows, each of the same length as {@code signature.size()}
* @param signature row signature
*/
public static InlineDataSource fromIterable(
final List<String> columnNames,
final List<ValueType> columnTypes,
final Iterable<Object[]> rows
final Iterable<Object[]> rows,
final RowSignature signature
)
{
return new InlineDataSource(columnNames, columnTypes, rows);
return new InlineDataSource(rows, signature);
}
@Override
@ -104,13 +113,21 @@ public class InlineDataSource implements DataSource
@JsonProperty
public List<String> getColumnNames()
{
return columnNames;
return signature.getColumnNames();
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<ValueType> getColumnTypes()
{
return columnTypes;
if (IntStream.range(0, signature.size()).noneMatch(i -> signature.getColumnType(i).isPresent())) {
// All types are null; return null for columnTypes so it doesn't show up in serialized JSON.
return null;
} else {
return IntStream.range(0, signature.size())
.mapToObj(i -> signature.getColumnType(i).orElse(null))
.collect(Collectors.toList());
}
}
/**
@ -166,15 +183,13 @@ public class InlineDataSource implements DataSource
return true;
}
/**
* Returns the row signature (map of column name to type) for this inline datasource. Note that types may
* be null, meaning we know we have a column with a certain name, but we don't know what its type is.
*/
public RowSignature getRowSignature()
{
final RowSignature.Builder builder = RowSignature.builder();
for (int i = 0; i < columnNames.size(); i++) {
builder.add(columnNames.get(i), columnTypes.get(i));
}
return builder.build();
return signature;
}
public RowAdapter<Object[]> rowAdapter()
@ -184,7 +199,7 @@ public class InlineDataSource implements DataSource
@Override
public ToLongFunction<Object[]> timestampFunction()
{
final int columnNumber = columnNames.indexOf(ColumnHolder.TIME_COLUMN_NAME);
final int columnNumber = signature.indexOf(ColumnHolder.TIME_COLUMN_NAME);
if (columnNumber >= 0) {
return row -> (long) row[columnNumber];
@ -196,7 +211,7 @@ public class InlineDataSource implements DataSource
@Override
public Function<Object[], Object> columnFunction(String columnName)
{
final int columnNumber = columnNames.indexOf(columnName);
final int columnNumber = signature.indexOf(columnName);
if (columnNumber >= 0) {
return row -> row[columnNumber];
@ -217,15 +232,14 @@ public class InlineDataSource implements DataSource
return false;
}
InlineDataSource that = (InlineDataSource) o;
return Objects.equals(columnNames, that.columnNames) &&
Objects.equals(columnTypes, that.columnTypes) &&
Objects.equals(rows, that.rows);
return rowsEqual(rows, that.rows) &&
Objects.equals(signature, that.signature);
}
@Override
public int hashCode()
{
return Objects.hash(columnNames, columnTypes, rows);
return Objects.hash(rowsHashCode(rows), signature);
}
@Override
@ -233,8 +247,56 @@ public class InlineDataSource implements DataSource
{
// Don't include 'rows' in stringification, because it might be long and/or lazy.
return "InlineDataSource{" +
"columnNames=" + columnNames +
", columnTypes=" + columnTypes +
"signature=" + signature +
'}';
}
/**
* A very zealous equality checker for "rows" that respects deep equality of arrays, but nevertheless refrains
* from materializing things needlessly. Useful for unit tests that want to compare equality of different
* InlineDataSource instances.
*/
private static boolean rowsEqual(final Iterable<Object[]> rowsA, final Iterable<Object[]> rowsB)
{
if (rowsA instanceof List && rowsB instanceof List) {
final List<Object[]> listA = (List<Object[]>) rowsA;
final List<Object[]> listB = (List<Object[]>) rowsB;
if (listA.size() != listB.size()) {
return false;
}
for (int i = 0; i < listA.size(); i++) {
final Object[] rowA = listA.get(i);
final Object[] rowB = listB.get(i);
if (!Arrays.equals(rowA, rowB)) {
return false;
}
}
return true;
} else {
return Objects.equals(rowsA, rowsB);
}
}
/**
* A very zealous hash code computer for "rows" that is compatible with {@link #rowsEqual}.
*/
private static int rowsHashCode(final Iterable<Object[]> rows)
{
if (rows instanceof List) {
final List<Object[]> list = (List<Object[]>) rows;
int code = 1;
for (final Object[] row : list) {
code = 31 * code + Arrays.hashCode(row);
}
return code;
} else {
return Objects.hash(rows);
}
}
}

View File

@ -131,6 +131,11 @@ public interface Query<T>
@Nullable
String getId();
Query<T> withSubQueryId(String subQueryId);
@Nullable
String getSubQueryId();
default Query<T> withSqlQueryId(String sqlQueryId)
{
return this;

View File

@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
@PublicApi
public class QueryContexts
{
public static final String FINALIZE_KEY = "finalize";
public static final String PRIORITY_KEY = "priority";
public static final String LANE_KEY = "lane";
public static final String TIMEOUT_KEY = "timeout";
@ -164,7 +165,7 @@ public class QueryContexts
public static <T> boolean isFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
return parseBoolean(query, FINALIZE_KEY, defaultValue);
}
public static <T> boolean isSerializeDateTimeAsLong(Query<T> query, boolean defaultValue)

View File

@ -202,6 +202,12 @@ public interface QueryMetrics<QueryType extends Query<?>>
@PublicApi
void queryId(QueryType query);
/**
* Sets {@link Query#getSubQueryId()} of the given query as dimension.
*/
@PublicApi
void subQueryId(QueryType query);
/**
* Sets {@link Query#getSqlQueryId()} of the given query as dimension
*/

View File

@ -87,6 +87,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
throw new ISE("Unsupported method in default query metrics implementation.");
}
@Override
public void subQueryId(SearchQuery query)
{
throw new ISE("Unsupported method in default query metrics implementation.");
}
@Override
public void sqlQueryId(SearchQuery query)
{

View File

@ -32,6 +32,7 @@ import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
@ -162,6 +163,19 @@ public class SelectQuery implements Query<Object>
throw new RuntimeException(REMOVED_ERROR_MESSAGE);
}
@Override
public Query<Object> withSubQueryId(String subQueryId)
{
throw new RuntimeException(REMOVED_ERROR_MESSAGE);
}
@Nullable
@Override
public String getSubQueryId()
{
throw new RuntimeException(REMOVED_ERROR_MESSAGE);
}
@Override
public Query<Object> withDataSource(DataSource dataSource)
{

View File

@ -48,7 +48,7 @@ import java.util.Objects;
@JsonTypeName("timeseries")
public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
{
static final String CTX_GRAND_TOTAL = "grandTotal";
public static final String CTX_GRAND_TOTAL = "grandTotal";
public static final String SKIP_EMPTY_BUCKETS = "skipEmptyBuckets";
private final VirtualColumns virtualColumns;

View File

@ -20,6 +20,7 @@
package org.apache.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.math.expr.Expr;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -121,6 +122,8 @@ public class ColumnProcessors
final ColumnSelectorFactory selectorFactory
)
{
Preconditions.checkNotNull(exprTypeHint, "'exprTypeHint' must be nonnull");
if (expr.getBindingIfIdentifier() != null) {
// If expr is an identifier, treat this the same way as a direct column reference.
return makeProcessor(expr.getBindingIfIdentifier(), processorFactory, selectorFactory);

View File

@ -21,6 +21,7 @@ package org.apache.druid.segment.column;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.ISE;
import javax.annotation.Nullable;
@ -65,7 +66,7 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
public ColumnCapabilitiesImpl setType(ValueType type)
{
this.type = type;
this.type = Preconditions.checkNotNull(type, "'type' must be nonnull");
return this;
}

View File

@ -36,7 +36,7 @@ public class MapJoinableFactory implements JoinableFactory
private final Map<Class<? extends DataSource>, JoinableFactory> joinableFactories;
@Inject
MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
public MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
{
// Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
// Class doesn't override Object.equals().

View File

@ -30,7 +30,6 @@ import it.unimi.dsi.fastutil.ints.IntRBTreeSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.BaseDoubleColumnValueSelector;
import org.apache.druid.segment.BaseFloatColumnValueSelector;
import org.apache.druid.segment.BaseLongColumnValueSelector;
@ -62,6 +61,9 @@ public class IndexedTableJoinMatcher implements JoinMatcher
{
private static final int UNINITIALIZED_CURRENT_ROW = -1;
// Key column type to use when the actual key type is unknown.
static final ValueType DEFAULT_KEY_TYPE = ValueType.STRING;
private final IndexedTable table;
private final List<Supplier<IntIterator>> conditionMatchers;
private final IntIterator[] currentMatchedRows;
@ -125,16 +127,15 @@ public class IndexedTableJoinMatcher implements JoinMatcher
final int keyColumnNumber = table.rowSignature().indexOf(condition.getRightColumn());
final ValueType keyColumnType =
table.rowSignature().getColumnType(condition.getRightColumn())
.orElseThrow(() -> new ISE("Encountered null type for column[%s]", condition.getRightColumn()));
final ValueType keyType =
table.rowSignature().getColumnType(condition.getRightColumn()).orElse(DEFAULT_KEY_TYPE);
final IndexedTable.Index index = table.columnIndex(keyColumnNumber);
return ColumnProcessors.makeProcessor(
condition.getLeftExpr(),
keyColumnType,
new ConditionMatcherFactory(keyColumnType, index),
keyType,
new ConditionMatcherFactory(keyType, index),
selectorFactory
);
}

View File

@ -83,9 +83,8 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
final Map<Object, IntList> m;
if (keyColumns.contains(column)) {
final ValueType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Key column[%s] must have nonnull type", column));
final ValueType keyType =
rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
final Function<RowType, Object> columnFunction = columnFunctions.get(i);
@ -93,7 +92,7 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
for (int j = 0; j < table.size(); j++) {
final RowType row = table.get(j);
final Object key = DimensionHandlerUtils.convertObjectToType(columnFunction.apply(row), columnType);
final Object key = DimensionHandlerUtils.convertObjectToType(columnFunction.apply(row), keyType);
if (key != null) {
final IntList array = m.computeIfAbsent(key, k -> new IntArrayList());
array.add(j);
@ -129,8 +128,7 @@ public class RowBasedIndexedTable<RowType> implements IndexedTable
}
final ValueType columnType =
rowSignature.getColumnType(column)
.orElseThrow(() -> new ISE("Key column[%s] must have nonnull type", column));
rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE);
return key -> {
final Object convertedKey = DimensionHandlerUtils.convertObjectToType(key, columnType, false);

View File

@ -71,17 +71,24 @@ public class InlineDataSourceTest
ValueType.COMPLEX
);
private final InlineDataSource listDataSource = InlineDataSource.fromIterable(
expectedColumnNames,
expectedColumnTypes,
rows
);
private final RowSignature expectedRowSignature;
private final InlineDataSource iterableDataSource = InlineDataSource.fromIterable(
expectedColumnNames,
expectedColumnTypes,
rowsIterable
);
private final InlineDataSource listDataSource;
private final InlineDataSource iterableDataSource;
public InlineDataSourceTest()
{
final RowSignature.Builder builder = RowSignature.builder();
for (int i = 0; i < expectedColumnNames.size(); i++) {
builder.add(expectedColumnNames.get(i), expectedColumnTypes.get(i));
}
expectedRowSignature = builder.build();
listDataSource = InlineDataSource.fromIterable(rows, expectedRowSignature);
iterableDataSource = InlineDataSource.fromIterable(rowsIterable, expectedRowSignature);
}
@Test
public void test_getTableNames()
@ -222,7 +229,7 @@ public class InlineDataSourceTest
{
EqualsVerifier.forClass(InlineDataSource.class)
.usingGetClass()
.withNonnullFields("columnNames", "columnTypes", "rows")
.withNonnullFields("rows", "signature")
.verify();
}
@ -245,6 +252,7 @@ public class InlineDataSourceTest
Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames());
Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes());
Assert.assertEquals(listDataSource.getRowSignature(), deserialized.getRowSignature());
assertRowsEqual(listDataSource.getRows(), deserialized.getRows());
}
@ -260,12 +268,38 @@ public class InlineDataSourceTest
// Lazy iterables turn into Lists upon serialization.
Assert.assertEquals(listDataSource.getColumnNames(), deserialized.getColumnNames());
Assert.assertEquals(listDataSource.getColumnTypes(), deserialized.getColumnTypes());
Assert.assertEquals(listDataSource.getRowSignature(), deserialized.getRowSignature());
assertRowsEqual(listDataSource.getRows(), deserialized.getRows());
// Should have iterated once.
Assert.assertEquals(1, iterationCounter.get());
}
@Test
public void test_serde_untyped() throws Exception
{
// Create a row signature with no types set.
final RowSignature.Builder builder = RowSignature.builder();
for (String columnName : expectedRowSignature.getColumnNames()) {
builder.add(columnName, null);
}
final RowSignature untypedSignature = builder.build();
final InlineDataSource untypedDataSource = InlineDataSource.fromIterable(rows, untypedSignature);
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final InlineDataSource deserialized = (InlineDataSource) jsonMapper.readValue(
jsonMapper.writeValueAsString(untypedDataSource),
DataSource.class
);
Assert.assertEquals(untypedDataSource.getColumnNames(), deserialized.getColumnNames());
Assert.assertEquals(untypedDataSource.getColumnTypes(), deserialized.getColumnTypes());
Assert.assertEquals(untypedDataSource.getRowSignature(), deserialized.getRowSignature());
Assert.assertNull(deserialized.getColumnTypes());
assertRowsEqual(listDataSource.getRows(), deserialized.getRows());
}
/**
* This method exists because "equals" on two equivalent Object[] won't return true, so we need to check
* for equality deeply.

View File

@ -34,6 +34,7 @@ import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinType;
@ -52,9 +53,8 @@ public class DataSourceAnalysisTest
private static final TableDataSource TABLE_BAR = new TableDataSource("bar");
private static final LookupDataSource LOOKUP_LOOKYLOO = new LookupDataSource("lookyloo");
private static final InlineDataSource INLINE = InlineDataSource.fromIterable(
ImmutableList.of("column"),
ImmutableList.of(ValueType.STRING),
ImmutableList.of(new Object[0])
ImmutableList.of(new Object[0]),
RowSignature.builder().add("column", ValueType.STRING).build()
);
@Test

View File

@ -55,6 +55,8 @@ import java.util.stream.Collectors;
public class RowBasedStorageAdapterTest
{
private static final String UNKNOWN_TYPE_NAME = "unknownType";
private static final RowSignature ROW_SIGNATURE =
RowSignature.builder()
.add(ValueType.FLOAT.name(), ValueType.FLOAT)
@ -62,6 +64,7 @@ public class RowBasedStorageAdapterTest
.add(ValueType.LONG.name(), ValueType.LONG)
.add(ValueType.STRING.name(), ValueType.STRING)
.add(ValueType.COMPLEX.name(), ValueType.COMPLEX)
.add(UNKNOWN_TYPE_NAME, null)
.build();
private static final List<Function<Cursor, Supplier<Object>>> READ_STRING =
@ -186,6 +189,9 @@ public class RowBasedStorageAdapterTest
@Override
public Function<Integer, Object> columnFunction(String columnName)
{
if (UNKNOWN_TYPE_NAME.equals(columnName)) {
return i -> i;
} else {
final ValueType valueType = GuavaUtils.getEnumIfPresent(ValueType.class, columnName);
if (valueType == null || valueType == ValueType.COMPLEX) {
@ -194,6 +200,7 @@ public class RowBasedStorageAdapterTest
return i -> DimensionHandlerUtils.convertObjectToType(i, valueType);
}
}
}
};
private static RowBasedStorageAdapter<Integer> createIntAdapter(final int... ints)
@ -380,6 +387,15 @@ public class RowBasedStorageAdapterTest
Assert.assertFalse(capabilities.isComplete());
}
@Test
public void test_getColumnCapabilities_unknownType()
{
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(UNKNOWN_TYPE_NAME);
Assert.assertNull(capabilities);
}
@Test
public void test_getColumnCapabilities_nonexistent()
{
@ -393,9 +409,13 @@ public class RowBasedStorageAdapterTest
final RowBasedStorageAdapter<Integer> adapter = createIntAdapter(0, 1, 2);
for (String columnName : ROW_SIGNATURE.getColumnNames()) {
if (UNKNOWN_TYPE_NAME.equals(columnName)) {
Assert.assertNull(columnName, adapter.getColumnTypeName(columnName));
} else {
Assert.assertEquals(columnName, ValueType.valueOf(columnName).name(), adapter.getColumnTypeName(columnName));
}
}
}
@Test
public void test_getColumnTypeName_nonexistent()
@ -695,7 +715,14 @@ public class RowBasedStorageAdapterTest
NullHandling.defaultDoubleValue(),
NullHandling.defaultLongValue(),
null,
null
null,
// unknownType
0f,
0d,
0L,
"0",
0
),
Lists.newArrayList(
Intervals.ETERNITY.getStart(),
@ -733,7 +760,14 @@ public class RowBasedStorageAdapterTest
NullHandling.defaultDoubleValue(),
NullHandling.defaultLongValue(),
null,
null
null,
// unknownType
1f,
1d,
1L,
"1",
1
)
),
walkCursors(cursors, new ArrayList<>(PROCESSORS.values()))

View File

@ -31,6 +31,7 @@ import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.ConstantDimensionSelector;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinConditionAnalysis;
import org.apache.druid.segment.join.JoinMatcher;
@ -67,12 +68,11 @@ public class IndexedTableJoinableTest
};
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of("str", "long"),
ImmutableList.of(ValueType.STRING, ValueType.LONG),
ImmutableList.of(
new Object[]{"foo", 1L},
new Object[]{"bar", 2L}
)
),
RowSignature.builder().add("str", ValueType.STRING).add("long", ValueType.LONG).build()
);
private final RowBasedIndexedTable<Object[]> indexedTable = new RowBasedIndexedTable<>(

View File

@ -21,29 +21,54 @@ package org.apache.druid.server;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.PostProcessingOperator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.ResultLevelCachingQueryRunner;
import org.apache.druid.query.RetryQueryRunner;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Stack;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Query handler for Broker processes (see CliBroker).
*
* This class is responsible for:
*
* 1) Running queries on the cluster using its 'clusterClient'
* 2) Running queries locally (when all datasources are global) using its 'localClient'
* 3) Inlining subqueries if necessary, in service of the above two goals
*/
public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
@ -109,43 +134,215 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
if (analysis.isConcreteTableBased()) {
return decorateClusterRunner(query, clusterClient.getQueryRunnerForIntervals(query, intervals));
} else if (analysis.isConcreteBased() && analysis.isGlobal()) {
// Concrete, non-table based, can run locally. No need to decorate since LocalQuerySegmentWalker does its own.
return localClient.getQueryRunnerForIntervals(query, intervals);
// First, do an inlining dry run to see if any inlining is necessary, without actually running the queries.
final DataSource inlineDryRun = inlineIfNecessary(query.getDataSource(), toolChest, new AtomicInteger(), true);
if (!canRunQueryUsingClusterWalker(query.withDataSource(inlineDryRun))
&& !canRunQueryUsingLocalWalker(query.withDataSource(inlineDryRun))) {
// Dry run didn't go well.
throw new ISE("Cannot handle subquery structure for dataSource: %s", query.getDataSource());
}
// Now that we know the structure is workable, actually do the inlining (if necessary).
final Query<T> newQuery = query.withDataSource(
inlineIfNecessary(
query.getDataSource(),
toolChest,
new AtomicInteger(),
false
)
);
if (canRunQueryUsingLocalWalker(newQuery)) {
// No need to decorate since LocalQuerySegmentWalker does its own.
return new QuerySwappingQueryRunner<>(
localClient.getQueryRunnerForIntervals(newQuery, intervals),
query,
newQuery
);
} else if (canRunQueryUsingClusterWalker(newQuery)) {
return new QuerySwappingQueryRunner<>(
decorateClusterRunner(newQuery, clusterClient.getQueryRunnerForIntervals(newQuery, intervals)),
query,
newQuery
);
} else {
// In the future, we will check here to see if parts of the query are inlinable, and if that inlining would
// be able to create a concrete table-based query that we can run through the distributed query stack.
throw new ISE("Query dataSource is not table-based, cannot run");
// We don't expect to ever get here, because the logic earlier in this method should have rejected any query
// that can't be run with either the local or cluster walkers. If this message ever shows up it is a bug.
throw new ISE("Inlined query could not be run");
}
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
// Inlining isn't done for segments-based queries.
if (analysis.isConcreteTableBased()) {
if (canRunQueryUsingClusterWalker(query)) {
return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs));
} else {
throw new ISE("Query dataSource is not table-based, cannot run");
// We don't expect end-users to see this message, since it only happens when specific segments are requested;
// this is not typical end-user behavior.
throw new ISE(
"Cannot run query on specific segments (must be table-based; outer query, if present, must be "
+ "handleable by the query toolchest natively)");
}
}
/**
* Checks if a query can be handled wholly by {@link #localClient}. Assumes that it is a
* {@link LocalQuerySegmentWalker} or something that behaves similarly.
*/
private <T> boolean canRunQueryUsingLocalWalker(Query<T> query)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// 1) Must be based on a concrete datasource that is not a table.
// 2) Must be based on globally available data (so we have a copy here on the Broker).
// 3) If there is an outer query, it must be handleable by the query toolchest (the local walker does not handle
// subqueries on its own).
return analysis.isConcreteBased() && !analysis.isConcreteTableBased() && analysis.isGlobal()
&& (!analysis.isQuery()
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
}
/**
* Checks if a query can be handled wholly by {@link #clusterClient}. Assumes that it is a
* {@link CachingClusteredClient} or something that behaves similarly.
*/
private <T> boolean canRunQueryUsingClusterWalker(Query<T> query)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
// 1) Must be based on a concrete table (the only shape the Druid cluster can handle).
// 2) If there is an outer query, it must be handleable by the query toolchest (the cluster walker does not handle
// subqueries on its own).
return analysis.isConcreteTableBased()
&& (!analysis.isQuery()
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
}
/**
* Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as:
*
* 1) For outermost subqueries: inlining is necessary if the toolchest cannot handle it.
* 2) For all other subqueries (e.g. those nested under a join): inlining is always necessary.
*
* @param dataSource datasource to process.
* @param toolChestIfOutermost if provided, and if the provided datasource is a {@link QueryDataSource}, this method
* will consider whether the toolchest can handle a subquery on the datasource using
* {@link QueryToolChest#canPerformSubquery}. If the toolchest can handle it, then it will
* not be inlined. See {@link org.apache.druid.query.groupby.GroupByQueryQueryToolChest}
* for an example of a toolchest that can handle subqueries.
* @param dryRun if true, does not actually execute any subqueries, but will inline empty result sets.
*/
@SuppressWarnings({"rawtypes", "unchecked"}) // Subquery, toolchest, runner handling all use raw types
private DataSource inlineIfNecessary(
final DataSource dataSource,
@Nullable final QueryToolChest toolChestIfOutermost,
final AtomicInteger subqueryRowLimitAccumulator,
final boolean dryRun
)
{
if (dataSource instanceof QueryDataSource) {
// This datasource is a subquery.
final Query subQuery = ((QueryDataSource) dataSource).getQuery();
final QueryToolChest toolChest = warehouse.getToolChest(subQuery);
if (toolChestIfOutermost != null && toolChestIfOutermost.canPerformSubquery(subQuery)) {
// Strip outer queries that are handleable by the toolchest, and inline subqueries that may be underneath
// them (e.g. subqueries nested under a join).
final Stack<DataSource> stack = new Stack<>();
DataSource current = dataSource;
while (current instanceof QueryDataSource) {
stack.push(current);
current = Iterables.getOnlyElement(current.getChildren());
}
assert !(current instanceof QueryDataSource);
current = inlineIfNecessary(current, null, subqueryRowLimitAccumulator, dryRun);
while (!stack.isEmpty()) {
current = stack.pop().withChildren(Collections.singletonList(current));
}
assert current instanceof QueryDataSource;
if (toolChest.canPerformSubquery(((QueryDataSource) current).getQuery())) {
return current;
} else {
// Something happened during inlining that means the toolchest is no longer able to handle this subquery.
// We need to consider inlining it.
return inlineIfNecessary(current, toolChestIfOutermost, subqueryRowLimitAccumulator, dryRun);
}
} else if (canRunQueryUsingLocalWalker(subQuery) || canRunQueryUsingClusterWalker(subQuery)) {
// Subquery needs to be inlined. Assign it a subquery id and run it.
final Query subQueryWithId = subQuery.withSubQueryId(UUID.randomUUID().toString());
final Sequence<?> queryResults;
if (dryRun) {
queryResults = Sequences.empty();
} else {
final QueryRunner subqueryRunner = subQueryWithId.getRunner(this);
queryResults = subqueryRunner.run(QueryPlus.wrap(subQueryWithId));
}
return toInlineDataSource(
subQueryWithId,
queryResults,
warehouse.getToolChest(subQueryWithId),
subqueryRowLimitAccumulator,
serverConfig.getMaxSubqueryRows()
);
} else {
// Cannot inline subquery. Attempt to inline one level deeper, and then try again.
return inlineIfNecessary(
dataSource.withChildren(
Collections.singletonList(
inlineIfNecessary(
Iterables.getOnlyElement(dataSource.getChildren()),
null,
subqueryRowLimitAccumulator,
dryRun
)
)
),
toolChestIfOutermost,
subqueryRowLimitAccumulator,
dryRun
);
}
} else {
// Not a query datasource. Walk children and see if there's anything to inline.
return dataSource.withChildren(
dataSource.getChildren()
.stream()
.map(child -> inlineIfNecessary(child, null, subqueryRowLimitAccumulator, dryRun))
.collect(Collectors.toList())
);
}
}
/**
* Decorate query runners created by {@link #clusterClient}, adding result caching, result merging, metric
* emission, etc. Not to be used on runners from {@link #localClient}, since we expect it to do this kind
* of decoration to itself.
*
* @param query the query
* @param baseClusterRunner runner from {@link #clusterClient}
*/
private <T> QueryRunner<T> decorateClusterRunner(Query<T> query, QueryRunner<T> baseClusterRunner)
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
query.<String>getContextValue("postProcessing"),
new TypeReference<PostProcessingOperator<T>>() {}
);
final QueryRunner<T> mostlyDecoratedRunner =
new FluentQueryRunnerBuilder<>(toolChest)
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new SetAndVerifyContextQueryRunner<>(
serverConfig,
@ -160,16 +357,100 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter)
.postProcess(postProcessing);
// This does not adhere to the fluent workflow. See https://github.com/apache/druid/issues/5517
return new ResultLevelCachingQueryRunner<>(
mostlyDecoratedRunner,
.postProcess(
objectMapper.convertValue(
query.<String>getContextValue("postProcessing"),
new TypeReference<PostProcessingOperator<T>>() {}
)
)
.map(
runner ->
new ResultLevelCachingQueryRunner<>(
runner,
toolChest,
query,
objectMapper,
cache,
cacheConfig
)
);
}
/**
* Convert the results of a particular query into a materialized (List-based) InlineDataSource.
*
* @param query the query
* @param results query results
* @param toolChest toolchest for the query
* @param limitAccumulator an accumulator for tracking the number of accumulated rows in all subqueries for a
* particular master query
* @param limit user-configured limit. If negative, will be treated as {@link Integer#MAX_VALUE}.
* If zero, this method will throw an error immediately.
*
* @throws ResourceLimitExceededException if the limit is exceeded
*/
private static <T, QueryType extends Query<T>> InlineDataSource toInlineDataSource(
final QueryType query,
final Sequence<T> results,
final QueryToolChest<T, QueryType> toolChest,
final AtomicInteger limitAccumulator,
final int limit
)
{
final int limitToUse = limit < 0 ? Integer.MAX_VALUE : limit;
if (limitAccumulator.get() >= limitToUse) {
throw new ResourceLimitExceededException("Cannot issue subquery, maximum[%d] reached", limitToUse);
}
final RowSignature signature = toolChest.resultArraySignature(query);
final List<Object[]> resultList = new ArrayList<>();
toolChest.resultsAsArrays(query, results).accumulate(
resultList,
(acc, in) -> {
if (limitAccumulator.getAndIncrement() >= limitToUse) {
throw new ResourceLimitExceededException(
"Subquery generated results beyond maximum[%d]",
limitToUse
);
}
acc.add(in);
return acc;
}
);
return InlineDataSource.fromIterable(resultList, signature);
}
/**
* A {@link QueryRunner} which validates that a *specific* query is passed in, and then swaps it with another one.
* Useful since the inlining we do relies on passing the modified query to the underlying {@link QuerySegmentWalker},
* and callers of {@link #getQueryRunnerForIntervals} aren't able to do this themselves.
*/
private static class QuerySwappingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final Query<T> query;
private final Query<T> newQuery;
public QuerySwappingQueryRunner(QueryRunner<T> baseRunner, Query<T> query, Query<T> newQuery)
{
this.baseRunner = baseRunner;
this.query = query;
this.newQuery = newQuery;
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
//noinspection ObjectEquality
if (queryPlus.getQuery() != query) {
throw new ISE("Unexpected query received");
}
return baseRunner.run(queryPlus.withQuery(newQuery), responseContext);
}
}
}

View File

@ -49,6 +49,8 @@ import java.util.stream.StreamSupport;
* The datasource for the query must satisfy {@link DataSourceAnalysis#isConcreteBased()} and
* {@link DataSourceAnalysis#isGlobal()}. Its base datasource must also be handleable by the provided
* {@link SegmentWrangler}.
*
* Mainly designed to be used by {@link ClientQuerySegmentWalker}.
*/
public class LocalQuerySegmentWalker implements QuerySegmentWalker
{

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;
/**
*
*/
public class ServerConfig
{
@ -48,6 +49,7 @@ public class ServerConfig
@NotNull Period maxIdleTime,
long defaultQueryTimeout,
long maxScatterGatherBytes,
int maxSubqueryRows,
long maxQueryTimeout,
int maxRequestHeaderSize,
@NotNull Period gracefulShutdownTimeout,
@ -63,6 +65,7 @@ public class ServerConfig
this.maxIdleTime = maxIdleTime;
this.defaultQueryTimeout = defaultQueryTimeout;
this.maxScatterGatherBytes = maxScatterGatherBytes;
this.maxSubqueryRows = maxSubqueryRows;
this.maxQueryTimeout = maxQueryTimeout;
this.maxRequestHeaderSize = maxRequestHeaderSize;
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
@ -100,6 +103,10 @@ public class ServerConfig
@Min(1)
private long maxScatterGatherBytes = Long.MAX_VALUE;
@JsonProperty
@Min(1)
private int maxSubqueryRows = 100000;
@JsonProperty
@Min(1)
private long maxQueryTimeout = Long.MAX_VALUE;
@ -157,6 +164,11 @@ public class ServerConfig
return maxScatterGatherBytes;
}
public int getMaxSubqueryRows()
{
return maxSubqueryRows;
}
public long getMaxQueryTimeout()
{
return maxQueryTimeout;
@ -207,6 +219,7 @@ public class ServerConfig
enableRequestLimit == that.enableRequestLimit &&
defaultQueryTimeout == that.defaultQueryTimeout &&
maxScatterGatherBytes == that.maxScatterGatherBytes &&
maxSubqueryRows == that.maxSubqueryRows &&
maxQueryTimeout == that.maxQueryTimeout &&
maxRequestHeaderSize == that.maxRequestHeaderSize &&
inflateBufferSize == that.inflateBufferSize &&
@ -227,6 +240,7 @@ public class ServerConfig
maxIdleTime,
defaultQueryTimeout,
maxScatterGatherBytes,
maxSubqueryRows,
maxQueryTimeout,
maxRequestHeaderSize,
gracefulShutdownTimeout,
@ -247,6 +261,7 @@ public class ServerConfig
", maxIdleTime=" + maxIdleTime +
", defaultQueryTimeout=" + defaultQueryTimeout +
", maxScatterGatherBytes=" + maxScatterGatherBytes +
", maxSubqueryRows=" + maxSubqueryRows +
", maxQueryTimeout=" + maxQueryTimeout +
", maxRequestHeaderSize=" + maxRequestHeaderSize +
", gracefulShutdownTimeout=" + gracefulShutdownTimeout +

View File

@ -152,6 +152,7 @@ public class CliIndexerServerModule implements Module
oldConfig.getMaxIdleTime(),
oldConfig.getDefaultQueryTimeout(),
oldConfig.getMaxScatterGatherBytes(),
oldConfig.getMaxSubqueryRows(),
oldConfig.getMaxQueryTimeout(),
oldConfig.getMaxRequestHeaderSize(),
oldConfig.getGracefulShutdownTimeout(),

View File

@ -44,6 +44,7 @@ public class ServerConfigSerdeTest
defaultConfig.getMaxIdleTime(),
defaultConfig.getDefaultQueryTimeout(),
defaultConfig.getMaxScatterGatherBytes(),
defaultConfig.getMaxSubqueryRows(),
defaultConfig.getMaxQueryTimeout(),
defaultConfig.getMaxRequestHeaderSize(),
defaultConfig.getGracefulShutdownTimeout(),

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
@ -41,12 +42,11 @@ public class InlineSegmentWranglerTest
private final InlineSegmentWrangler factory = new InlineSegmentWrangler();
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of("str", "long"),
ImmutableList.of(ValueType.STRING, ValueType.LONG),
ImmutableList.of(
new Object[]{"foo", 1L},
new Object[]{"bar", 2L}
)
),
RowSignature.builder().add("str", ValueType.STRING).add("long", ValueType.LONG).build()
);
@Test

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.hamcrest.CoreMatchers;
@ -43,12 +44,11 @@ public class InlineJoinableFactoryTest
private final InlineJoinableFactory factory = new InlineJoinableFactory();
private final InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of("str", "long"),
ImmutableList.of(ValueType.STRING, ValueType.LONG),
ImmutableList.of(
new Object[]{"foo", 1L},
new Object[]{"bar", 2L}
)
),
RowSignature.builder().add("str", ValueType.STRING).add("long", ValueType.LONG).build()
);
@Test

View File

@ -0,0 +1,620 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChestTestHelper;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.UnionDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryHelper;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Tests ClientQuerySegmentWalker.
*
* Note that since SpecificSegmentsQuerySegmentWalker in the druid-sql module also uses ClientQuerySegmentWalker, it's
* also exercised pretty well by the SQL tests (especially CalciteQueryTest). This class adds an extra layer of testing.
* In particular, this class makes it easier to add tests that validate queries are *made* in the correct way, not just
* that they return the correct results.
*/
public class ClientQuerySegmentWalkerTest
{
private static final Logger log = new Logger(ClientQuerySegmentWalkerTest.class);
private static final String FOO = "foo";
private static final String BAR = "bar";
private static final Interval INTERVAL = Intervals.of("2000/P1Y");
private static final String VERSION = "A";
private static final ShardSpec SHARD_SPEC = new NumberedShardSpec(0, 1);
private static final InlineDataSource FOO_INLINE = InlineDataSource.fromIterable(
ImmutableList.<Object[]>builder()
.add(new Object[]{INTERVAL.getStartMillis(), "x", 1})
.add(new Object[]{INTERVAL.getStartMillis(), "x", 2})
.add(new Object[]{INTERVAL.getStartMillis(), "y", 3})
.add(new Object[]{INTERVAL.getStartMillis(), "z", 4})
.build(),
RowSignature.builder()
.addTimeColumn()
.add("s", ValueType.STRING)
.add("n", ValueType.LONG)
.build()
);
private static final InlineDataSource BAR_INLINE = InlineDataSource.fromIterable(
ImmutableList.<Object[]>builder()
.add(new Object[]{INTERVAL.getStartMillis(), "a", 1})
.add(new Object[]{INTERVAL.getStartMillis(), "a", 2})
.add(new Object[]{INTERVAL.getStartMillis(), "b", 3})
.add(new Object[]{INTERVAL.getStartMillis(), "c", 4})
.build(),
RowSignature.builder()
.addTimeColumn()
.add("s", ValueType.STRING)
.add("n", ValueType.LONG)
.build()
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
private Closer closer;
private QueryRunnerFactoryConglomerate conglomerate;
// Queries that are issued; checked by "testQuery" against its "expectedQueries" parameter.
private List<ExpectedQuery> issuedQueries = new ArrayList<>();
// A ClientQuerySegmentWalker that has two segments: one for FOO and one for BAR; each with interval INTERVAL,
// version VERSION, and shard spec SHARD_SPEC.
private ClientQuerySegmentWalker walker;
@Before
public void setUp()
{
closer = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
initWalker(ImmutableMap.of());
}
@After
public void tearDown() throws IOException
{
closer.close();
}
@Test
public void testTimeseriesOnTable()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(FOO)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false))
.build();
testQuery(
query,
ImmutableList.of(ExpectedQuery.cluster(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
}
@Test
public void testTimeseriesOnInline()
{
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(FOO_INLINE)
.granularity(Granularities.ALL)
.intervals(Collections.singletonList(INTERVAL))
.aggregators(new LongSumAggregatorFactory("sum", "n"))
.build();
testQuery(
query,
ImmutableList.of(ExpectedQuery.local(query)),
ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L})
);
}
@Test
public void testTimeseriesOnGroupByOnTable()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build();
testQuery(
query,
ImmutableList.of(
ExpectedQuery.cluster(subquery),
ExpectedQuery.local(
query.withDataSource(
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"x"}, new Object[]{"y"}, new Object[]{"z"}),
RowSignature.builder().add("s", ValueType.STRING).build()
)
)
)
),
ImmutableList.of(new Object[]{Intervals.ETERNITY.getStartMillis(), 3L})
);
}
@Test
public void testGroupByOnGroupByOnTable()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(new QueryDataSource(subquery))
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
testQuery(
query,
// GroupBy handles its own subqueries; only the inner one will go to the cluster.
ImmutableList.of(ExpectedQuery.cluster(subquery)),
ImmutableList.of(new Object[]{3L})
);
}
@Test
public void testGroupByOnUnionOfTwoTables()
{
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(
new UnionDataSource(
ImmutableList.of(
new TableDataSource(FOO),
new TableDataSource(BAR)
)
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
testQuery(
query,
ImmutableList.of(
ExpectedQuery.cluster(query.withDataSource(new TableDataSource(FOO))),
ExpectedQuery.cluster(query.withDataSource(new TableDataSource(BAR)))
),
ImmutableList.of(
new Object[]{"a", 2L},
new Object[]{"b", 1L},
new Object[]{"c", 1L},
new Object[]{"x", 2L},
new Object[]{"y", 1L},
new Object[]{"z", 1L}
)
);
}
@Test
public void testJoinOnGroupByOnTable()
{
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.setDimFilter(new SelectorDimFilter("s", "y", null))
.build();
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
new TableDataSource(FOO),
new QueryDataSource(subquery),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
testQuery(
query,
ImmutableList.of(
ExpectedQuery.cluster(subquery),
ExpectedQuery.cluster(
query.withDataSource(
query.getDataSource().withChildren(
ImmutableList.of(
query.getDataSource().getChildren().get(0),
InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{"y"}),
RowSignature.builder().add("s", ValueType.STRING).build()
)
)
)
)
)
),
ImmutableList.of(new Object[]{"y", "y", 1L})
);
}
@Test
public void testJoinOnTableErrorCantInlineTable()
{
final GroupByQuery query =
GroupByQuery.builder()
.setDataSource(
JoinDataSource.create(
new TableDataSource(FOO),
new TableDataSource(BAR),
"j.",
"\"j.s\" == \"s\"",
JoinType.INNER,
ExprMacroTable.nil()
)
)
.setGranularity(Granularities.ALL)
.setInterval(Intervals.ONLY_ETERNITY)
.setDimensions(DefaultDimensionSpec.of("s"), DefaultDimensionSpec.of("j.s"))
.setAggregatorSpecs(new CountAggregatorFactory("cnt"))
.build();
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("Cannot handle subquery structure for dataSource");
testQuery(query, ImmutableList.of(), ImmutableList.of());
}
@Test
public void testTimeseriesOnGroupByOnTableErrorTooManyRows()
{
initWalker(ImmutableMap.of("maxSubqueryRows", "2"));
final GroupByQuery subquery =
GroupByQuery.builder()
.setDataSource(FOO)
.setGranularity(Granularities.ALL)
.setInterval(Collections.singletonList(INTERVAL))
.setDimensions(DefaultDimensionSpec.of("s"))
.build();
final TimeseriesQuery query =
Druids.newTimeseriesQueryBuilder()
.dataSource(new QueryDataSource(subquery))
.granularity(Granularities.ALL)
.intervals(Intervals.ONLY_ETERNITY)
.aggregators(new CountAggregatorFactory("cnt"))
.build();
expectedException.expect(ResourceLimitExceededException.class);
expectedException.expectMessage("Subquery generated results beyond maximum[2]");
testQuery(query, ImmutableList.of(), ImmutableList.of());
}
/**
* Initialize (or reinitialize) our {@link #walker} and {@link #closer}.
*/
private void initWalker(final Map<String, String> serverProperties)
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
final ServerConfig serverConfig = jsonMapper.convertValue(serverProperties, ServerConfig.class);
final SegmentWrangler segmentWrangler = new MapSegmentWrangler(
ImmutableMap.<Class<? extends DataSource>, SegmentWrangler>builder()
.put(InlineDataSource.class, new InlineSegmentWrangler())
.build()
);
final JoinableFactory joinableFactory = new MapJoinableFactory(
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
.put(InlineDataSource.class, new InlineJoinableFactory())
.build()
);
class CapturingWalker implements QuerySegmentWalker
{
private QuerySegmentWalker baseWalker;
private ClusterOrLocal how;
CapturingWalker(QuerySegmentWalker baseWalker, ClusterOrLocal how)
{
this.baseWalker = baseWalker;
this.how = how;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals)
{
final QueryRunner<T> baseRunner = baseWalker.getQueryRunnerForIntervals(query, intervals);
return (queryPlus, responseContext) -> {
log.info("Query (%s): %s", how, queryPlus.getQuery());
issuedQueries.add(new ExpectedQuery(queryPlus.getQuery(), how));
return baseRunner.run(queryPlus, responseContext);
};
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
final QueryRunner<T> baseRunner = baseWalker.getQueryRunnerForSegments(query, specs);
return (queryPlus, responseContext) -> {
log.info("Query (%s): %s", how, queryPlus.getQuery());
issuedQueries.add(new ExpectedQuery(queryPlus.getQuery(), how));
return baseRunner.run(queryPlus, responseContext);
};
}
}
walker = QueryStackTests.createClientQuerySegmentWalker(
new CapturingWalker(
QueryStackTests.createClusterQuerySegmentWalker(
ImmutableMap.of(
FOO, makeTimeline(FOO, FOO_INLINE),
BAR, makeTimeline(BAR, BAR_INLINE)
),
joinableFactory,
conglomerate,
null /* QueryScheduler */
),
ClusterOrLocal.CLUSTER
),
new CapturingWalker(
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
segmentWrangler,
joinableFactory
),
ClusterOrLocal.LOCAL
),
conglomerate,
serverConfig
);
}
/**
* Issue {@code query} through {@link #walker}. Verifies that a specific set of subqueries is issued and that a
* specific set of results is returned. The results are expected to be in array form; see
* {@link org.apache.druid.query.QueryToolChest#resultsAsArrays}.
*/
private <T> void testQuery(
final Query<T> query,
final List<ExpectedQuery> expectedQueries,
final List<Object[]> expectedResults
)
{
issuedQueries.clear();
final Sequence<T> resultSequence =
QueryPlus.wrap(query).run(walker, ResponseContext.createEmpty());
final List<Object[]> arrays =
conglomerate.findFactory(query).getToolchest().resultsAsArrays(query, resultSequence).toList();
for (Object[] array : arrays) {
log.info("Result: %s", Arrays.toString(array));
}
QueryToolChestTestHelper.assertArrayResultsEquals(expectedResults, Sequences.simple(arrays));
Assert.assertEquals(expectedQueries, issuedQueries);
}
private enum ClusterOrLocal
{
CLUSTER,
LOCAL
}
private static class ExpectedQuery
{
private final Query<?> query;
private final ClusterOrLocal how;
ExpectedQuery(Query<?> query, ClusterOrLocal how)
{
// Need to blast various parameters that will vary and aren't important to test for.
this.query = query.withOverriddenContext(
ImmutableMap.<String, Object>builder()
.put(BaseQuery.SUB_QUERY_ID, "dummy")
.put(DirectDruidClient.QUERY_FAIL_TIME, 0L)
.put(QueryContexts.DEFAULT_TIMEOUT_KEY, 0L)
.put(QueryContexts.FINALIZE_KEY, true)
.put(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 0L)
.put(GroupByQuery.CTX_KEY_SORT_BY_DIMS_FIRST, false)
.put(GroupByQueryHelper.CTX_KEY_SORT_RESULTS, false)
.put(GroupByQueryConfig.CTX_KEY_ARRAY_RESULT_ROWS, true)
.put(GroupByQueryConfig.CTX_KEY_STRATEGY, "X")
.put(GroupByQueryConfig.CTX_KEY_APPLY_LIMIT_PUSH_DOWN, true)
.put(GroupByStrategyV2.CTX_KEY_OUTERMOST, true)
.put(GroupByStrategyV2.CTX_KEY_FUDGE_TIMESTAMP, "1979")
.build()
);
this.how = how;
}
static ExpectedQuery local(final Query query)
{
return new ExpectedQuery(query, ClusterOrLocal.LOCAL);
}
static ExpectedQuery cluster(final Query query)
{
return new ExpectedQuery(query, ClusterOrLocal.CLUSTER);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExpectedQuery that = (ExpectedQuery) o;
return Objects.equals(query, that.query) &&
how == that.how;
}
@Override
public int hashCode()
{
return Objects.hash(query, how);
}
@Override
public String toString()
{
return "ExpectedQuery{" +
"query=" + query +
", how=" + how +
'}';
}
}
private static VersionedIntervalTimeline<String, ReferenceCountingSegment> makeTimeline(
final String name,
final InlineDataSource dataSource
)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline =
new VersionedIntervalTimeline<>(Comparator.naturalOrder());
timeline.add(
INTERVAL,
VERSION,
SHARD_SPEC.createChunk(
ReferenceCountingSegment.wrapSegment(
new RowBasedSegment<>(
SegmentId.of(name, INTERVAL, VERSION, SHARD_SPEC.getPartitionNum()),
dataSource.getRows(),
dataSource.rowAdapter(),
dataSource.getRowSignature()
),
SHARD_SPEC
)
)
);
return timeline;
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Map;
/**
* Utilities for creating query-stack objects for tests.
*/
public class QueryStackTests
{
private static final ServiceEmitter EMITTER = new NoopServiceEmitter();
private static final int COMPUTE_BUFFER_SIZE = 10 * 1024 * 1024;
private QueryStackTests()
{
// No instantiation.
}
public static ClientQuerySegmentWalker createClientQuerySegmentWalker(
final QuerySegmentWalker clusterWalker,
final QuerySegmentWalker localWalker,
final QueryRunnerFactoryConglomerate conglomerate,
final ServerConfig serverConfig
)
{
return new ClientQuerySegmentWalker(
EMITTER,
clusterWalker,
localWalker,
new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
},
new RetryQueryRunnerConfig(),
TestHelper.makeJsonMapper(),
serverConfig,
null /* Cache */,
new CacheConfig()
{
@Override
public boolean isPopulateCache()
{
return false;
}
@Override
public boolean isUseCache()
{
return false;
}
@Override
public boolean isPopulateResultLevelCache()
{
return false;
}
@Override
public boolean isUseResultLevelCache()
{
return false;
}
}
);
}
public static TestClusterQuerySegmentWalker createClusterQuerySegmentWalker(
Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines,
JoinableFactory joinableFactory,
QueryRunnerFactoryConglomerate conglomerate,
@Nullable QueryScheduler scheduler
)
{
return new TestClusterQuerySegmentWalker(timelines, joinableFactory, conglomerate, scheduler);
}
public static LocalQuerySegmentWalker createLocalQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final SegmentWrangler segmentWrangler,
final JoinableFactory joinableFactory
)
{
return new LocalQuerySegmentWalker(conglomerate, segmentWrangler, joinableFactory, EMITTER);
}
/**
* Returns a new {@link QueryRunnerFactoryConglomerate}. Adds relevant closeables to the passed-in {@link Closer}.
*/
public static QueryRunnerFactoryConglomerate createQueryRunnerFactoryConglomerate(final Closer closer)
{
final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(COMPUTE_BUFFER_SIZE)
);
closer.register(stupidPool);
final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair =
GroupByQueryRunnerTest.makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return COMPUTE_BUFFER_SIZE;
}
@Override
public int getNumThreads()
{
// Only use 1 thread for tests.
return 1;
}
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable.
return 3;
}
}
);
final GroupByQueryRunnerFactory groupByQueryRunnerFactory = factoryCloserPair.lhs;
closer.register(factoryCloserPair.rhs);
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
SegmentMetadataQuery.class,
new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(
new SegmentMetadataQueryConfig("P1W")
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
ScanQuery.class,
new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
TopNQuery.class,
new TopNQueryRunnerFactory(
stupidPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(GroupByQuery.class, groupByQueryRunnerFactory)
.build()
);
return conglomerate;
}
}

View File

@ -0,0 +1,295 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* Mimics the behavior of {@link org.apache.druid.client.CachingClusteredClient} when it queries data servers (like
* Historicals, which use {@link org.apache.druid.server.coordination.ServerManager}). Used by {@link QueryStackTests}.
*
* This class's logic is like a mashup of those two classes. With the right abstractions, it may be possible to get rid
* of this class and replace it with the production classes.
*/
public class TestClusterQuerySegmentWalker implements QuerySegmentWalker
{
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines;
private final JoinableFactory joinableFactory;
private final QueryRunnerFactoryConglomerate conglomerate;
@Nullable
private final QueryScheduler scheduler;
TestClusterQuerySegmentWalker(
Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines,
JoinableFactory joinableFactory,
QueryRunnerFactoryConglomerate conglomerate,
@Nullable QueryScheduler scheduler
)
{
this.timelines = timelines;
this.joinableFactory = joinableFactory;
this.conglomerate = conglomerate;
this.scheduler = scheduler;
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
// Just like CachingClusteredClient, ignore "query" and defer action until the QueryRunner is called.
// Strange, but true. Required to get authentic behavior with UnionDataSources. (Although, it would be great if
// this wasn't required.)
return (queryPlus, responseContext) -> {
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(queryPlus.getQuery().getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", queryPlus.getQuery().getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable
.create(intervals)
.transformCat(interval -> getSegmentsForTable(dataSourceName, interval))
.transform(WindowedSegment::getDescriptor);
return getQueryRunnerForSegments(queryPlus.getQuery(), segmentDescriptors).run(queryPlus, responseContext);
};
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery()
&& !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
new AtomicLong(),
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
)
)
),
toolChest
);
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
// to function properly.
return (theQuery, responseContext) -> {
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
return scheduler.run(
scheduler.prioritizeAndLaneQuery(theQuery, segments),
new LazySequence<>(
() -> baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(
theQuery.getQuery(),
ImmutableList.copyOf(specs)
)),
responseContext
)
)
);
} else {
return baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
responseContext
);
}
};
}
private <T> QueryRunner<T> makeTableRunner(
final QueryToolChest<T, Query<T>> toolChest,
final QueryRunnerFactory<T, Query<T>> factory,
final Iterable<WindowedSegment> segments,
final Function<Segment, Segment> segmentMapFn
)
{
final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
if (segmentsList.isEmpty()) {
// Note: this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
return new NoopQueryRunner<>();
}
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(segmentsList)
.transform(
segment ->
new SpecificSegmentQueryRunner<>(
factory.createRunner(segmentMapFn.apply(segment.getSegment())),
new SpecificSegmentSpec(segment.getDescriptor())
)
)
)
),
toolChest
);
}
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Interval interval)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : timeline.lookup(interval)) {
for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
retVal.add(new WindowedSegment(chunk.getObject(), holder.getInterval()));
}
}
return retVal;
}
}
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Iterable<SegmentDescriptor> specs)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (SegmentDescriptor spec : specs) {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
spec.getInterval(),
spec.getVersion()
);
retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval()));
}
return retVal;
}
}
private static class WindowedSegment
{
private final Segment segment;
private final Interval interval;
public WindowedSegment(Segment segment, Interval interval)
{
this.segment = segment;
this.interval = interval;
Preconditions.checkArgument(segment.getId().getInterval().contains(interval));
}
public Segment getSegment()
{
return segment;
}
public Interval getInterval()
{
return interval;
}
public SegmentDescriptor getDescriptor()
{
return new SegmentDescriptor(interval, segment.getId().getVersion(), segment.getId().getPartitionNum());
}
}
}

View File

@ -48,6 +48,7 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.log.TestRequestLogger;
@ -127,10 +128,8 @@ public class DruidAvaticaHandlerTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -26,10 +26,10 @@ import org.apache.calcite.avatica.Meta;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.sql.SqlLifecycleFactory;
@ -67,10 +67,8 @@ public class DruidStatementTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -28,7 +28,6 @@ import org.apache.druid.common.config.NullHandling;
import org.apache.druid.hll.VersionOneHyperLogLogCollector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
@ -63,6 +62,7 @@ import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthenticationResult;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.ForbiddenException;
@ -407,10 +407,8 @@ public class BaseCalciteQueryTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -20,9 +20,9 @@
package org.apache.druid.sql.calcite.schema;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.planner.PlannerConfig;
import org.apache.druid.sql.calcite.util.CalciteTestBase;
@ -42,14 +42,12 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
@Test
public void testInitializationWithNoData() throws Exception
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
try {
try (final Closer closer = Closer.create()) {
final QueryRunnerFactoryConglomerate conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(closer);
final DruidSchema druidSchema = new DruidSchema(
CalciteTests.createMockQueryLifecycleFactory(
new SpecificSegmentsQuerySegmentWalker(conglomerateCloserPair.lhs),
conglomerateCloserPair.lhs
new SpecificSegmentsQuerySegmentWalker(conglomerate),
conglomerate
),
new TestServerInventoryView(Collections.emptyList()),
PLANNER_CONFIG_DEFAULT,
@ -62,8 +60,5 @@ public class DruidSchemaNoDataInitTest extends CalciteTestBase
Assert.assertEquals(ImmutableMap.of(), druidSchema.getTableMap());
}
finally {
conglomerateCloserPair.rhs.close();
}
}
}

View File

@ -42,6 +42,7 @@ import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.security.NoopEscalator;
@ -96,10 +97,8 @@ public class DruidSchemaTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -50,7 +50,6 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.http.client.Request;
@ -70,6 +69,7 @@ import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
@ -155,10 +155,8 @@ public class SystemSchemaTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass

View File

@ -34,7 +34,6 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@ -52,8 +51,6 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
@ -61,12 +58,8 @@ import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
@ -76,29 +69,7 @@ import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.query.expression.LookupExprMacro;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryConfig;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanQueryQueryToolChest;
import org.apache.druid.query.scan.ScanQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
@ -143,7 +114,6 @@ import org.joda.time.chrono.ISOChronology;
import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -560,106 +530,6 @@ public class CalciteTests
public static final DruidViewMacroFactory DRUID_VIEW_MACRO_FACTORY = new TestDruidViewMacroFactory();
/**
* Returns a new {@link QueryRunnerFactoryConglomerate} and a {@link Closer} which should be closed at the end of the
* test.
*/
public static Pair<QueryRunnerFactoryConglomerate, Closer> createQueryRunnerFactoryConglomerate()
{
final Closer resourceCloser = Closer.create();
final CloseableStupidPool<ByteBuffer> stupidPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(10 * 1024 * 1024)
);
resourceCloser.register(stupidPool);
final Pair<GroupByQueryRunnerFactory, Closer> factoryCloserPair = GroupByQueryRunnerTest
.makeQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return 10 * 1024 * 1024;
}
@Override
public int getNumThreads()
{
// Only use 1 thread for tests.
return 1;
}
@Override
public int getNumMergeBuffers()
{
// Need 3 buffers for CalciteQueryTest.testDoubleNestedGroupby.
// Two buffers for the broker and one for the queryable
return 3;
}
}
);
final GroupByQueryRunnerFactory factory = factoryCloserPair.lhs;
resourceCloser.register(factoryCloserPair.rhs);
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
SegmentMetadataQuery.class,
new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(
new SegmentMetadataQueryConfig("P1W")
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
ScanQuery.class,
new ScanQueryRunnerFactory(
new ScanQueryQueryToolChest(
new ScanQueryConfig(),
new DefaultGenericQueryMetricsFactory()
),
new ScanQueryEngine(),
new ScanQueryConfig()
)
)
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
TopNQuery.class,
new TopNQueryRunnerFactory(
stupidPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(GroupByQuery.class, factory)
.build()
);
return Pair.of(conglomerate, resourceCloser);
}
public static QueryLifecycleFactory createMockQueryLifecycleFactory(
final QuerySegmentWalker walker,
final QueryRunnerFactoryConglomerate conglomerate

View File

@ -19,62 +19,34 @@
package org.apache.druid.sql.calcite.util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import org.apache.druid.client.SegmentServerSelector;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.MapSegmentWrangler;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.Joinables;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.LocalQuerySegmentWalker;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.TimelineObjectHolder;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -83,28 +55,20 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
/**
* A self-contained class that executes queries similarly to the normal Druid query stack.
*
* {@link ClientQuerySegmentWalker}, the same class that Brokers use as the entry point for their query stack, is
* used directly. Our own class {@link DataServerLikeWalker} mimics the behavior of
* {@link org.apache.druid.server.coordination.ServerManager}, the entry point for Historicals. That class isn't used
* directly because the sheer volume of dependencies makes it quite verbose to use in a test environment.
* used directly. It, and the sub-walkers it needs, are created by {@link QueryStackTests}.
*/
public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, Closeable
{
private final QueryRunnerFactoryConglomerate conglomerate;
private final QuerySegmentWalker walker;
private final JoinableFactory joinableFactory;
private final QueryScheduler scheduler;
private final Map<String, VersionedIntervalTimeline<String, ReferenceCountingSegment>> timelines = new HashMap<>();
private final List<Closeable> closeables = new ArrayList<>();
private final List<DataSegment> segments = new ArrayList<>();
@ -121,97 +85,41 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
@Nullable final QueryScheduler scheduler
)
{
final NoopServiceEmitter emitter = new NoopServiceEmitter();
final JoinableFactory joinableFactoryToUse;
this.conglomerate = conglomerate;
this.joinableFactory = joinableFactory == null ?
MapJoinableFactoryTest.fromMap(
if (joinableFactory == null) {
joinableFactoryToUse = MapJoinableFactoryTest.fromMap(
ImmutableMap.<Class<? extends DataSource>, JoinableFactory>builder()
.put(InlineDataSource.class, new InlineJoinableFactory())
.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider))
.build()
) : joinableFactory;
);
} else {
joinableFactoryToUse = joinableFactory;
}
this.scheduler = scheduler;
this.walker = new ClientQuerySegmentWalker(
emitter,
new DataServerLikeWalker(),
new LocalQuerySegmentWalker(
this.walker = QueryStackTests.createClientQuerySegmentWalker(
QueryStackTests.createClusterQuerySegmentWalker(
timelines,
joinableFactoryToUse,
conglomerate,
scheduler
),
QueryStackTests.createLocalQuerySegmentWalker(
conglomerate,
new MapSegmentWrangler(ImmutableMap.of()),
this.joinableFactory,
emitter
joinableFactoryToUse
),
new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
},
new RetryQueryRunnerConfig(),
TestHelper.makeJsonMapper(),
new ServerConfig(),
null /* Cache */,
new CacheConfig()
{
@Override
public boolean isPopulateCache()
{
return false;
}
@Override
public boolean isUseCache()
{
return false;
}
@Override
public boolean isPopulateResultLevelCache()
{
return false;
}
@Override
public boolean isUseResultLevelCache()
{
return false;
}
}
conglomerate,
new ServerConfig()
);
}
/**
* Create an instance using the provided query runner factory conglomerate and lookup provider.
* If a JoinableFactory is provided, it will be used instead of the default.
*/
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
final LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable final JoinableFactory joinableFactory
)
{
this(conglomerate, lookupProvider, joinableFactory, null);
}
/**
* Create an instance without any lookups, using the default JoinableFactory
* Create an instance without any lookups and with a default {@link JoinableFactory} that handles only inline
* datasources.
*/
public SpecificSegmentsQuerySegmentWalker(final QueryRunnerFactoryConglomerate conglomerate)
{
this(conglomerate, null);
}
/**
* Create an instance without any lookups, optionally allowing the default JoinableFactory to be overridden
*/
public SpecificSegmentsQuerySegmentWalker(
final QueryRunnerFactoryConglomerate conglomerate,
@Nullable JoinableFactory joinableFactory
)
{
this(
conglomerate,
@ -229,14 +137,12 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
return Optional.empty();
}
},
joinableFactory
null,
null
);
}
public SpecificSegmentsQuerySegmentWalker add(
final DataSegment descriptor,
final QueryableIndex index
)
public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, final QueryableIndex index)
{
final Segment segment = new QueryableIndexSegment(index, descriptor.getId());
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.computeIfAbsent(
@ -259,19 +165,13 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
}
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(
final Query<T> query,
final Iterable<Interval> intervals
)
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
return walker.getQueryRunnerForIntervals(query, intervals);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(
final Query<T> query,
final Iterable<SegmentDescriptor> specs
)
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
return walker.getQueryRunnerForSegments(query, specs);
}
@ -283,209 +183,4 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C
Closeables.close(closeable, true);
}
}
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Interval interval)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : timeline.lookup(interval)) {
for (PartitionChunk<ReferenceCountingSegment> chunk : holder.getObject()) {
retVal.add(new WindowedSegment(chunk.getObject(), holder.getInterval()));
}
}
return retVal;
}
}
private List<WindowedSegment> getSegmentsForTable(final String dataSource, final Iterable<SegmentDescriptor> specs)
{
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline = timelines.get(dataSource);
if (timeline == null) {
return Collections.emptyList();
} else {
final List<WindowedSegment> retVal = new ArrayList<>();
for (SegmentDescriptor spec : specs) {
final PartitionHolder<ReferenceCountingSegment> entry = timeline.findEntry(
spec.getInterval(),
spec.getVersion()
);
retVal.add(new WindowedSegment(entry.getChunk(spec.getPartitionNumber()).getObject(), spec.getInterval()));
}
return retVal;
}
}
public static class WindowedSegment
{
private final Segment segment;
private final Interval interval;
public WindowedSegment(Segment segment, Interval interval)
{
this.segment = segment;
this.interval = interval;
Preconditions.checkArgument(segment.getId().getInterval().contains(interval));
}
public Segment getSegment()
{
return segment;
}
public Interval getInterval()
{
return interval;
}
public SegmentDescriptor getDescriptor()
{
return new SegmentDescriptor(interval, segment.getId().getVersion(), segment.getId().getPartitionNum());
}
}
/**
* Mimics the behavior of a data server (e.g. Historical).
*
* Compare to {@link org.apache.druid.server.SegmentManager}.
*/
private class DataServerLikeWalker implements QuerySegmentWalker
{
@Override
public <T> QueryRunner<T> getQueryRunnerForIntervals(final Query<T> query, final Iterable<Interval> intervals)
{
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
FunctionalIterable<SegmentDescriptor> segmentDescriptors = FunctionalIterable
.create(intervals)
.transformCat(interval -> getSegmentsForTable(dataSourceName, interval))
.transform(WindowedSegment::getDescriptor);
return getQueryRunnerForSegments(query, segmentDescriptors);
}
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(final Query<T> query, final Iterable<SegmentDescriptor> specs)
{
final QueryRunnerFactory<T, Query<T>> factory = conglomerate.findFactory(query);
if (factory == null) {
throw new ISE("Unknown query type[%s].", query.getClass());
}
final DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(query.getDataSource());
if (!analysis.isConcreteTableBased()) {
throw new ISE("Cannot handle datasource: %s", query.getDataSource());
}
final String dataSourceName = ((TableDataSource) analysis.getBaseDataSource()).getName();
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
// Make sure this query type can handle the subquery, if present.
if (analysis.isQuery()
&& !toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())) {
throw new ISE("Cannot handle subquery: %s", analysis.getDataSource());
}
final Function<Segment, Segment> segmentMapFn = Joinables.createSegmentMapFn(
analysis.getPreJoinableClauses(),
joinableFactory,
new AtomicLong(),
QueryContexts.getEnableJoinFilterPushDown(query),
QueryContexts.getEnableJoinFilterRewrite(query),
QueryContexts.getEnableJoinFilterRewriteValueColumnFilters(query),
QueryContexts.getJoinFilterRewriteMaxSize(query),
query.getFilter() == null ? null : query.getFilter().toFilter(),
query.getVirtualColumns()
);
final QueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
makeTableRunner(toolChest, factory, getSegmentsForTable(dataSourceName, specs), segmentMapFn)
)
)
),
toolChest
);
// Wrap baseRunner in a runner that rewrites the QuerySegmentSpec to mention the specific segments.
// This mimics what CachingClusteredClient on the Broker does, and is required for certain queries (like Scan)
// to function properly.
return (theQuery, responseContext) -> {
if (scheduler != null) {
Set<SegmentServerSelector> segments = new HashSet<>();
specs.forEach(spec -> segments.add(new SegmentServerSelector(null, spec)));
return scheduler.run(
scheduler.prioritizeAndLaneQuery(theQuery, segments),
new LazySequence<>(
() -> baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(
theQuery.getQuery(),
ImmutableList.copyOf(specs)
)),
responseContext
)
)
);
} else {
return baseRunner.run(
theQuery.withQuery(Queries.withSpecificSegments(theQuery.getQuery(), ImmutableList.copyOf(specs))),
responseContext
);
}
};
}
private <T> QueryRunner<T> makeTableRunner(
final QueryToolChest<T, Query<T>> toolChest,
final QueryRunnerFactory<T, Query<T>> factory,
final Iterable<WindowedSegment> segments,
final Function<Segment, Segment> segmentMapFn
)
{
final List<WindowedSegment> segmentsList = Lists.newArrayList(segments);
if (segmentsList.isEmpty()) {
// Note: this is not correct when there's a right or full outer join going on.
// See https://github.com/apache/druid/issues/9229 for details.
return new NoopQueryRunner<>();
}
return new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
factory.mergeRunners(
Execs.directExecutor(),
FunctionalIterable
.create(segmentsList)
.transform(
segment ->
new SpecificSegmentQueryRunner<>(
factory.createRunner(segmentMapFn.apply(segment.getSegment())),
new SpecificSegmentSpec(segment.getDescriptor())
)
)
)
),
toolChest
);
}
}
}

View File

@ -44,6 +44,7 @@ import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.server.QueryScheduler;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.log.TestRequestLogger;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -96,7 +97,6 @@ public class SqlResourceTest extends CalciteTestBase
@Rule
public QueryLogHook queryLogHook = QueryLogHook.create();
private SpecificSegmentsQuerySegmentWalker walker = null;
private QueryScheduler scheduler = null;
private TestRequestLogger testRequestLogger;
private SqlResource resource;
private HttpServletRequest req;
@ -105,10 +105,8 @@ public class SqlResourceTest extends CalciteTestBase
@BeforeClass
public static void setUpClass()
{
final Pair<QueryRunnerFactoryConglomerate, Closer> conglomerateCloserPair = CalciteTests
.createQueryRunnerFactoryConglomerate();
conglomerate = conglomerateCloserPair.lhs;
resourceCloser = conglomerateCloserPair.rhs;
resourceCloser = Closer.create();
conglomerate = QueryStackTests.createQueryRunnerFactoryConglomerate(resourceCloser);
}
@AfterClass
@ -120,15 +118,14 @@ public class SqlResourceTest extends CalciteTestBase
@Before
public void setUp() throws Exception
{
executorService = MoreExecutors.listeningDecorator(
Execs.multiThreaded(8, "test_sql_resource_%s")
);
scheduler = new QueryScheduler(
final QueryScheduler scheduler = new QueryScheduler(
5,
ManualQueryPrioritizationStrategy.INSTANCE,
new HiLoQueryLaningStrategy(40),
new ServerConfig()
);
executorService = MoreExecutors.listeningDecorator(Execs.multiThreaded(8, "test_sql_resource_%s"));
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder(), scheduler);
final PlannerConfig plannerConfig = new PlannerConfig()
@ -139,7 +136,12 @@ public class SqlResourceTest extends CalciteTestBase
return false;
}
};
final SchemaPlus rootSchema = CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, CalciteTests.TEST_AUTHORIZER_MAPPER);
final SchemaPlus rootSchema = CalciteTests.createMockRootSchema(
conglomerate,
walker,
plannerConfig,
CalciteTests.TEST_AUTHORIZER_MAPPER
);
final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable();
final ExprMacroTable macroTable = CalciteTests.createExprMacroTable();
req = EasyMock.createStrictMock(HttpServletRequest.class);