Remove the deprecated interval-chunking stuff. (#9216)

* Remove the deprecated interval-chunking stuff.

See https://github.com/apache/druid/pull/6591, https://github.com/apache/druid/pull/4004#issuecomment-284171911 for details.

* Remove unused import.

* Remove chunkInterval too.
This commit is contained in:
Gian Merlino 2020-01-19 17:14:23 -08:00 committed by GitHub
parent d64bed79f0
commit d21054f7c5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
78 changed files with 248 additions and 1390 deletions

View File

@ -197,9 +197,7 @@ public class FilteredAggregatorBenchmark
qIndex = INDEX_IO.loadIndex(indexFile); qIndex = INDEX_IO.loadIndex(indexFile);
factory = new TimeseriesQueryRunnerFactory( factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );

View File

@ -409,10 +409,7 @@ public class GroupByTypeInterfaceBenchmark
factory = new GroupByQueryRunnerFactory( factory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
)
); );
} }

View File

@ -301,10 +301,7 @@ public class TopNTypeInterfaceBenchmark
0, 0,
Integer.MAX_VALUE Integer.MAX_VALUE
), ),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );
} }

View File

@ -260,9 +260,7 @@ public class CachingClusteredClientBenchmark
.put( .put(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
@ -274,10 +272,7 @@ public class CachingClusteredClientBenchmark
"TopNQueryRunnerFactory-bufferPool", "TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(PROCESSING_BUFFER_SIZE) () -> ByteBuffer.allocate(PROCESSING_BUFFER_SIZE)
), ),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )
@ -374,14 +369,8 @@ public class CachingClusteredClientBenchmark
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
); );
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector);
strategySelector, return new GroupByQueryRunnerFactory(strategySelector, toolChest);
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(
strategySelector,
toolChest
);
} }
@TearDown(Level.Trial) @TearDown(Level.Trial)

View File

@ -572,10 +572,7 @@ public class GroupByBenchmark
factory = new GroupByQueryRunnerFactory( factory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
)
); );
} }

View File

@ -19,19 +19,14 @@
package org.apache.druid.benchmark.query; package org.apache.druid.benchmark.query;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.common.config.NullHandling; import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.segment.Segment; import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
@ -53,29 +48,5 @@ public class QueryBenchmarkUtil
); );
} }
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator() public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> {};
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new QueryRunner<T>() {
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}
};
}
};
}
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher()
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
} }

View File

@ -373,10 +373,7 @@ public class SearchBenchmark
final SearchQueryConfig config = new SearchQueryConfig().withOverrides(query); final SearchQueryConfig config = new SearchQueryConfig().withOverrides(query);
factory = new SearchQueryRunnerFactory( factory = new SearchQueryRunnerFactory(
new SearchStrategySelector(Suppliers.ofInstance(config)), new SearchStrategySelector(Suppliers.ofInstance(config)),
new SearchQueryQueryToolChest( new SearchQueryQueryToolChest(config),
config,
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );
} }

View File

@ -296,9 +296,7 @@ public class TimeseriesBenchmark
} }
factory = new TimeseriesQueryRunnerFactory( factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );

View File

@ -279,10 +279,7 @@ public class TopNBenchmark
0, 0,
Integer.MAX_VALUE Integer.MAX_VALUE
), ),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );
} }

View File

@ -230,10 +230,7 @@ public class TimeCompareBenchmark
0, 0,
Integer.MAX_VALUE Integer.MAX_VALUE
), ),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()
),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );
} }
@ -274,7 +271,7 @@ public class TimeCompareBenchmark
timeseriesQuery = timeseriesQueryBuilder.build(); timeseriesQuery = timeseriesQueryBuilder.build();
timeseriesFactory = new TimeseriesQueryRunnerFactory( timeseriesFactory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryBenchmarkUtil.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryBenchmarkUtil.NOOP_QUERYWATCHER QueryBenchmarkUtil.NOOP_QUERYWATCHER
); );

View File

@ -1505,7 +1505,7 @@ Druid broker can optionally retry queries internally for transient errors.
##### Processing ##### Processing
The broker uses processing configs for nested groupBy queries. And, if you use groupBy v1, long-interval queries (of any type) can be broken into shorter interval queries and processed in parallel inside this thread pool. For more details, see "chunkPeriod" in the [query context](../querying/query-context.md) doc. The broker uses processing configs for nested groupBy queries.
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|

View File

@ -123,11 +123,6 @@ Latest default metrics mapping can be found [here] (https://github.com/apache/dr
"type": "timer", "type": "timer",
"timeUnit": "MILLISECONDS" "timeUnit": "MILLISECONDS"
}, },
"query/intervalChunk/time": {
"dimensions": [],
"type": "timer",
"timeUnit": "MILLISECONDS"
},
"query/segment/time": { "query/segment/time": {
"dimensions": [], "dimensions": [],
"type": "timer", "type": "timer",

View File

@ -144,14 +144,6 @@ On the Broker, the amount of direct memory needed depends on how many merge buff
- `druid.processing.numThreads`: set this to 1 (the minimum allowed) - `druid.processing.numThreads`: set this to 1 (the minimum allowed)
- `druid.processing.numMergeBuffers`: set this to the same value as on Historicals or a bit higher - `druid.processing.numMergeBuffers`: set this to the same value as on Historicals or a bit higher
##### Note on the deprecated `chunkPeriod`
There is one exception to the Broker not needing processing threads and processing buffers:
If the deprecated `chunkPeriod` property in the [query context](../querying/query-context.md) is set, GroupBy V1 queries will use processing threads and processing buffers on the Broker.
Both `chunkPeriod` and GroupBy V1 are deprecated (use GroupBy V2 instead) and will be removed in the future, we do not recommend using them. The presence of the deprecated `chunkPeriod` feature is why a minimum of 1 processing thread must be configured, even if it's unused.
#### Connection pool sizing #### Connection pool sizing
Please see the [General Connection Pool Guidelines](#connection-pool) section for an overview of connection pool configuration. Please see the [General Connection Pool Guidelines](#connection-pool) section for an overview of connection pool configuration.

View File

@ -54,7 +54,6 @@ Available Metrics
|`query/node/bytes`|number of bytes returned from querying individual historical/realtime processes.|id, status, server.| | |`query/node/bytes`|number of bytes returned from querying individual historical/realtime processes.|id, status, server.| |
|`query/node/ttfb`|Time to first byte. Milliseconds elapsed until Broker starts receiving the response from individual historical/realtime processes.|id, status, server.|< 1s| |`query/node/ttfb`|Time to first byte. Milliseconds elapsed until Broker starts receiving the response from individual historical/realtime processes.|id, status, server.|< 1s|
|`query/node/backpressure`|Milliseconds that the channel to this process has spent suspended due to backpressure.|id, status, server.| | |`query/node/backpressure`|Milliseconds that the channel to this process has spent suspended due to backpressure.|id, status, server.| |
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk. This metric is deprecated and will be removed in the future because interval chunking is deprecated. See [Query Context](../querying/query-context.md).|id, status, chunkInterval (if interval chunking is enabled).|< 1s|
|`query/count`|number of total queries|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/count`|number of total queries|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/success/count`|number of queries successfully processed|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/success/count`|number of queries successfully processed|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.|| |`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||

View File

@ -257,8 +257,6 @@ by using a finite-sized merge buffer pool. By default, the number of merge buffe
threads. You can adjust this as necessary to balance concurrency and memory usage. threads. You can adjust this as necessary to balance concurrency and memory usage.
- groupBy v1 supports caching on either the Broker or Historical processes, whereas groupBy v2 only supports caching on - groupBy v1 supports caching on either the Broker or Historical processes, whereas groupBy v2 only supports caching on
Historical processes. Historical processes.
- groupBy v1 supports using [chunkPeriod](query-context.html) to parallelize merging on the Broker, whereas groupBy v2
ignores chunkPeriod.
- groupBy v2 supports both array-based aggregation and hash-based aggregation. The array-based aggregation is used only - groupBy v2 supports both array-based aggregation and hash-based aggregation. The array-based aggregation is used only
when the grouping key is a single indexed string column. In array-based aggregation, the dictionary-encoded value is used when the grouping key is a single indexed string column. In array-based aggregation, the dictionary-encoded value is used
as the index, so the aggregated values in the array can be accessed directly without finding buckets based on hashing. as the index, so the aggregated values in the array can be accessed directly without finding buckets based on hashing.

View File

@ -36,7 +36,6 @@ The query context is used for various query configuration parameters. The follow
|populateResultLevelCache | `true` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateResultLevelCache` to determine whether or not to save the results of this query to the result-level query cache | |populateResultLevelCache | `true` | Flag indicating whether to save the results of the query to the result level cache. Primarily used for debugging. When set to false, it disables saving the results of this query to the query cache. When set to true, Druid uses `druid.broker.cache.populateResultLevelCache` to determine whether or not to save the results of this query to the result-level query cache |
|bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from | |bySegment | `false` | Return "by segment" results. Primarily used for debugging, setting it to `true` returns results associated with the data segment they came from |
|finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` | |finalize | `true` | Flag indicating whether to "finalize" aggregation results. Primarily used for debugging. For instance, the `hyperUnique` aggregator will return the full HyperLogLog sketch instead of the estimated cardinality when this flag is set to `false` |
|chunkPeriod | `P0D` (off) | At the Broker process level, long interval queries (of any type) may be broken into shorter interval queries to parallelize merging more than normal. Broken up queries will use a larger share of cluster resources, but, if you use groupBy "v1, it may be able to complete faster as a result. Use ISO 8601 periods. For example, if this property is set to `P1M` (one month), then a query covering a year would be broken into 12 smaller queries. The broker uses its query processing executor service to initiate processing for query chunks, so make sure `druid.processing.numThreads` is configured appropriately on the broker. [groupBy queries](groupbyquery.html) do not support chunkPeriod by default, although they do if using the legacy "v1" engine. This context is deprecated since it's only useful for groupBy "v1", and will be removed in the future releases.|
|maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.html#broker) for more details.| |maxScatterGatherBytes| `druid.server.http.maxScatterGatherBytes` | Maximum number of bytes gathered from data processes such as Historicals and realtime processes to execute a query. This parameter can be used to further reduce `maxScatterGatherBytes` limit at query time. See [Broker configuration](../configuration/index.html#broker) for more details.|
|maxQueuedBytes | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.| |maxQueuedBytes | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.|
|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process| |serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process|

View File

@ -71,7 +71,6 @@ public class WhiteListBasedDruidToTimelineEventConverterTest
"query/time, true", "query/time, true",
"query/node/ttfb, true", "query/node/ttfb, true",
"query/segmentAndCache/time, true", "query/segmentAndCache/time, true",
"query/intervalChunk/time, false",
"query/time/balaba, true", "query/time/balaba, true",
"query/tim, false", "query/tim, false",
"segment/added/bytes, true", "segment/added/bytes, true",

View File

@ -28,11 +28,6 @@
"type": "timer", "type": "timer",
"timeUnit": "MILLISECONDS" "timeUnit": "MILLISECONDS"
}, },
"query/intervalChunk/time": {
"dimensions": [],
"type": "timer",
"timeUnit": "MILLISECONDS"
},
"query/segment/time": { "query/segment/time": {
"dimensions": [], "dimensions": [],
"type": "timer", "type": "timer",

View File

@ -75,7 +75,6 @@ public class WhiteListBasedConverterTest
"query/time, true", "query/time, true",
"query/node/ttfb, true", "query/node/ttfb, true",
"query/segmentAndCache/time, true", "query/segmentAndCache/time, true",
"query/intervalChunk/time, false",
"query/time/balaba, true", "query/time/balaba, true",
"query/tim, false", "query/tim, false",
"segment/added/bytes, false", "segment/added/bytes, false",

View File

@ -55,12 +55,7 @@ public class MaterializedViewQueryQueryToolChestTest
QueryToolChest materializedViewQueryQueryToolChest = QueryToolChest materializedViewQueryQueryToolChest =
new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse( new MaterializedViewQueryQueryToolChest(new MapQueryToolChestWarehouse(
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder() ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put( .put(TimeseriesQuery.class, new TimeseriesQueryQueryToolChest())
TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
)
.build() .build()
)); ));

View File

@ -16,9 +16,6 @@
"query/node/ttfb": [ "query/node/ttfb": [
"server" "server"
], ],
"query/intervalChunk/time": [
"chunkInterval"
],
"query/success/count": [], "query/success/count": [],
"query/failed/count": [], "query/failed/count": [],
"query/interrupted/count": [], "query/interrupted/count": [],

View File

@ -5,7 +5,6 @@
"query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer"}, "query/node/ttfb" : { "dimensions" : ["server"], "type" : "timer"},
"query/node/bytes" : { "dimensions" : ["server"], "type" : "count"}, "query/node/bytes" : { "dimensions" : ["server"], "type" : "count"},
"query/node/backpressure": { "dimensions" : ["server"], "type" : "timer"}, "query/node/backpressure": { "dimensions" : ["server"], "type" : "timer"},
"query/intervalChunk/time" : { "dimensions" : [], "type" : "timer"},
"query/segment/time" : { "dimensions" : [], "type" : "timer"}, "query/segment/time" : { "dimensions" : [], "type" : "timer"},
"query/wait/time" : { "dimensions" : [], "type" : "timer"}, "query/wait/time" : { "dimensions" : [], "type" : "timer"},

View File

@ -110,10 +110,7 @@ public class MapVirtualColumnGroupByTest extends InitializedNullHandlingTest
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory( final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
runner = QueryRunnerTestHelper.makeQueryRunner( runner = QueryRunnerTestHelper.makeQueryRunner(

View File

@ -67,10 +67,7 @@ public class MapVirtualColumnTopNTest extends InitializedNullHandlingTest
final TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory( final TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory(
new StupidPool<>("map-virtual-column-test", () -> ByteBuffer.allocate(1024)), new StupidPool<>("map-virtual-column-test", () -> ByteBuffer.allocate(1024)),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -79,20 +79,14 @@ public class ApproximateHistogramTopNQueryTest extends InitializedNullHandlingTe
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
defaultPool, defaultPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
), ),
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
customPool, customPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )

View File

@ -79,20 +79,14 @@ public class FixedBucketsHistogramTopNQueryTest extends InitializedNullHandlingT
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
defaultPool, defaultPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
), ),
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
customPool, customPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )

View File

@ -87,13 +87,10 @@ import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQuery;
@ -2508,24 +2505,12 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
private QueryRunnerFactoryConglomerate makeTimeseriesAndScanConglomerate() private QueryRunnerFactoryConglomerate makeTimeseriesAndScanConglomerate()
{ {
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
null,
null,
null
)
{
@Override
public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return delegate;
}
};
return new DefaultQueryRunnerFactoryConglomerate( return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder() ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put( .put(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(queryRunnerDecorator), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
(query, future) -> { (query, future) -> {
// do nothing // do nothing

View File

@ -88,11 +88,7 @@ import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.SelectorDimFilter; import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQuery;
@ -2753,26 +2749,11 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate() private QueryRunnerFactoryConglomerate makeTimeseriesOnlyConglomerate()
{ {
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
null,
null,
null
)
{
@Override
public <T> QueryRunner<T> decorate(
QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return delegate;
}
};
return new DefaultQueryRunnerFactoryConglomerate( return new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.of( ImmutableMap.of(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(queryRunnerDecorator), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
(query, future) -> { (query, future) -> {
// do nothing // do nothing

View File

@ -136,10 +136,7 @@ public class VarianceTopNQueryTest extends InitializedNullHandlingTest
TopNQuery query TopNQuery query
) )
{ {
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest( final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(new TopNQueryConfig());
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner); final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner);
final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(QueryPlus.wrap(query)); final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(QueryPlus.wrap(query));
TestHelper.assertExpectedResults(expectedResults, retval); TestHelper.assertExpectedResults(expectedResults, retval);

View File

@ -95,12 +95,8 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
@ -1530,23 +1526,11 @@ public class AppenderatorDriverRealtimeIndexTaskTest
taskActionToolbox, taskActionToolbox,
new TaskAuditLogConfig(false) new TaskAuditLogConfig(false)
); );
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
null,
null,
null
)
{
@Override
public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return delegate;
}
};
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.of( ImmutableMap.of(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(queryRunnerDecorator), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
(query, future) -> { (query, future) -> {
// do nothing // do nothing

View File

@ -79,12 +79,9 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.EntryExistsException; import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.Druids; import org.apache.druid.query.Druids;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
@ -902,23 +899,11 @@ public class RealtimeIndexTaskTest
taskActionToolbox, taskActionToolbox,
new TaskAuditLogConfig(false) new TaskAuditLogConfig(false)
); );
IntervalChunkingQueryRunnerDecorator queryRunnerDecorator = new IntervalChunkingQueryRunnerDecorator(
null,
null,
null
)
{
@Override
public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return delegate;
}
};
final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.of( ImmutableMap.of(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(queryRunnerDecorator), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
new QueryWatcher() new QueryWatcher()
{ {

View File

@ -156,12 +156,6 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
setDimension("segment", segmentIdentifier); setDimension("segment", segmentIdentifier);
} }
@Override
public void chunkInterval(Interval interval)
{
setDimension("chunkInterval", interval.toString());
}
@Override @Override
public void preFilters(List<Filter> preFilters) public void preFilters(List<Filter> preFilters)
{ {
@ -228,13 +222,6 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
return reportMillisTimeMetric("query/segmentAndCache/time", timeNs); return reportMillisTimeMetric("query/segmentAndCache/time", timeNs);
} }
@Deprecated
@Override
public QueryMetrics<QueryType> reportIntervalChunkTime(long timeNs)
{
return reportMillisTimeMetric("query/intervalChunk/time", timeNs);
}
@Override @Override
public QueryMetrics<QueryType> reportCpuTime(long timeNs) public QueryMetrics<QueryType> reportCpuTime(long timeNs)
{ {

View File

@ -1,161 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* This class is deprecated and will removed in the future.
* See https://github.com/apache/druid/pull/4004#issuecomment-284171911 for details about deprecation.
*/
@Deprecated
public class IntervalChunkingQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> baseRunner;
private final QueryToolChest<T, Query<T>> toolChest;
private final ExecutorService executor;
private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter;
public IntervalChunkingQueryRunner(
QueryRunner<T> baseRunner,
QueryToolChest<T, Query<T>> toolChest,
ExecutorService executor,
QueryWatcher queryWatcher,
ServiceEmitter emitter
)
{
this.baseRunner = baseRunner;
this.toolChest = toolChest;
this.executor = executor;
this.queryWatcher = queryWatcher;
this.emitter = emitter;
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
final Period chunkPeriod = getChunkPeriod(queryPlus.getQuery());
// Check for non-empty chunkPeriod, avoiding toStandardDuration since that cannot handle periods like P1M.
if (DateTimes.EPOCH.plus(chunkPeriod).getMillis() == DateTimes.EPOCH.getMillis()) {
return baseRunner.run(queryPlus, responseContext);
}
List<Interval> chunkIntervals = Lists.newArrayList(
FunctionalIterable
.create(queryPlus.getQuery().getIntervals())
.transformCat(
new Function<Interval, Iterable<Interval>>()
{
@Override
public Iterable<Interval> apply(Interval input)
{
return splitInterval(input, chunkPeriod);
}
}
)
);
if (chunkIntervals.size() <= 1) {
return baseRunner.run(queryPlus, responseContext);
}
return Sequences.concat(
Lists.newArrayList(
FunctionalIterable.create(chunkIntervals).transform(
new Function<Interval, Sequence<T>>()
{
@Override
public Sequence<T> apply(Interval singleInterval)
{
return new AsyncQueryRunner<T>(
//Note: it is assumed that toolChest.mergeResults(..) gives a query runner that is
//not lazy i.e. it does most of its work on call to run() method
toolChest.mergeResults(
new MetricsEmittingQueryRunner<T>(
emitter,
toolChest,
baseRunner,
QueryMetrics::reportIntervalChunkTime,
queryMetrics -> queryMetrics.chunkInterval(singleInterval)
).withWaitMeasuredFromNow()
),
executor, queryWatcher
).run(
queryPlus.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Collections.singletonList(singleInterval))),
responseContext
);
}
}
)
)
);
}
private static Iterable<Interval> splitInterval(Interval interval, Period period)
{
if (interval.getEndMillis() == interval.getStartMillis()) {
return Collections.singletonList(interval);
}
List<Interval> intervals = new ArrayList<>();
Iterator<Interval> timestamps = new PeriodGranularity(period, null, null).getIterable(interval).iterator();
DateTime start = DateTimes.max(timestamps.next().getStart(), interval.getStart());
while (timestamps.hasNext()) {
DateTime end = timestamps.next().getStart();
intervals.add(new Interval(start, end));
start = end;
}
if (start.compareTo(interval.getEnd()) < 0) {
intervals.add(new Interval(start, interval.getEnd()));
}
return intervals;
}
private Period getChunkPeriod(Query<T> query)
{
final String p = QueryContexts.getChunkPeriod(query);
return Period.parse(p);
}
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Processing;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import java.util.concurrent.ExecutorService;
/**
* This class is deprecated and will removed in the future.
* See https://github.com/apache/druid/pull/4004#issuecomment-284171911 for details about deprecation.
*/
@Deprecated
public class IntervalChunkingQueryRunnerDecorator
{
private final ExecutorService executor;
private final QueryWatcher queryWatcher;
private final ServiceEmitter emitter;
@Inject
public IntervalChunkingQueryRunnerDecorator(@Processing ExecutorService executor, QueryWatcher queryWatcher,
ServiceEmitter emitter)
{
this.executor = executor;
this.queryWatcher = queryWatcher;
this.emitter = emitter;
}
@PublicApi
public <T> QueryRunner<T> decorate(QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new IntervalChunkingQueryRunner<T>(delegate, (QueryToolChest<T, Query<T>>) toolChest,
executor, queryWatcher, emitter);
}
}

View File

@ -43,9 +43,6 @@ public class QueryContexts
public static final String BROKER_PARALLEL_MERGE_INITIAL_YIELD_ROWS_KEY = "parallelMergeInitialYieldRows"; public static final String BROKER_PARALLEL_MERGE_INITIAL_YIELD_ROWS_KEY = "parallelMergeInitialYieldRows";
public static final String BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY = "parallelMergeSmallBatchRows"; public static final String BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY = "parallelMergeSmallBatchRows";
public static final String BROKER_PARALLELISM = "parallelMergeParallelism"; public static final String BROKER_PARALLELISM = "parallelMergeParallelism";
@Deprecated
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
public static final String VECTORIZE_KEY = "vectorize"; public static final String VECTORIZE_KEY = "vectorize";
public static final String VECTOR_SIZE_KEY = "vectorSize"; public static final String VECTOR_SIZE_KEY = "vectorSize";
@ -221,12 +218,6 @@ public class QueryContexts
return parseInt(query, BROKER_PARALLELISM, defaultValue); return parseInt(query, BROKER_PARALLELISM, defaultValue);
} }
@Deprecated
public static <T> String getChunkPeriod(Query<T> query)
{
return query.getContextValue(CHUNK_PERIOD_KEY, "P0D");
}
public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit) public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxScatterGatherBytesLimit)
{ {
Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY); Object obj = query.getContextValue(MAX_SCATTER_GATHER_BYTES_KEY);

View File

@ -25,7 +25,6 @@ import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.search.SearchQueryMetricsFactory; import org.apache.druid.query.search.SearchQueryMetricsFactory;
import org.joda.time.Interval;
import java.util.List; import java.util.List;
@ -224,8 +223,6 @@ public interface QueryMetrics<QueryType extends Query<?>>
void segment(String segmentIdentifier); void segment(String segmentIdentifier);
void chunkInterval(Interval interval);
void preFilters(List<Filter> preFilters); void preFilters(List<Filter> preFilters);
void postFilters(List<Filter> postFilters); void postFilters(List<Filter> postFilters);
@ -281,11 +278,6 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/ */
QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs); QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs);
/**
* Registers "interval chunk time" metric.
*/
QueryMetrics<QueryType> reportIntervalChunkTime(long timeNs);
/** /**
* Registers "cpu time" metric. * Registers "cpu time" metric.
*/ */

View File

@ -42,7 +42,6 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryDataSource;
@ -91,28 +90,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
public static final String GROUP_BY_MERGE_KEY = "groupByMerge"; public static final String GROUP_BY_MERGE_KEY = "groupByMerge";
private final GroupByStrategySelector strategySelector; private final GroupByStrategySelector strategySelector;
@Deprecated
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final GroupByQueryMetricsFactory queryMetricsFactory; private final GroupByQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting @VisibleForTesting
public GroupByQueryQueryToolChest( public GroupByQueryQueryToolChest(GroupByStrategySelector strategySelector)
GroupByStrategySelector strategySelector,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{ {
this(strategySelector, intervalChunkingQueryRunnerDecorator, DefaultGroupByQueryMetricsFactory.instance()); this(strategySelector, DefaultGroupByQueryMetricsFactory.instance());
} }
@Inject @Inject
public GroupByQueryQueryToolChest( public GroupByQueryQueryToolChest(
GroupByStrategySelector strategySelector, GroupByStrategySelector strategySelector,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
GroupByQueryMetricsFactory queryMetricsFactory GroupByQueryMetricsFactory queryMetricsFactory
) )
{ {
this.strategySelector = strategySelector; this.strategySelector = strategySelector;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory; this.queryMetricsFactory = queryMetricsFactory;
} }
@ -497,13 +489,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<ResultRow, GroupB
} }
} }
return strategySelector.strategize(delegateGroupByQuery) return runner.run(
.createIntervalChunkingRunner(
intervalChunkingQueryRunnerDecorator,
runner,
GroupByQueryQueryToolChest.this
)
.run(
queryPlus.withQuery(delegateGroupByQuery.withDimensionSpecs(dimensionSpecs)), queryPlus.withQuery(delegateGroupByQuery.withDimensionSpecs(dimensionSpecs)),
responseContext responseContext
); );

View File

@ -22,7 +22,6 @@ package org.apache.druid.query.groupby.strategy;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryRunnerFactory;
@ -71,15 +70,6 @@ public interface GroupByStrategy
*/ */
boolean doMergeResults(GroupByQuery query); boolean doMergeResults(GroupByQuery query);
/**
* Decorate a runner with an interval chunking decorator.
*/
QueryRunner<ResultRow> createIntervalChunkingRunner(
IntervalChunkingQueryRunnerDecorator decorator,
QueryRunner<ResultRow> runner,
GroupByQueryQueryToolChest toolChest
);
/** /**
* Runs a provided {@link QueryRunner} on a provided {@link GroupByQuery}, which is assumed to return rows that are * Runs a provided {@link QueryRunner} on a provided {@link GroupByQuery}, which is assumed to return rows that are
* properly sorted (by timestamp and dimensions) but not necessarily fully merged (that is, there may be adjacent * properly sorted (by timestamp and dimensions) but not necessarily fully merged (that is, there may be adjacent

View File

@ -33,7 +33,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.GroupByMergedQueryRunner; import org.apache.druid.query.GroupByMergedQueryRunner;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryWatcher; import org.apache.druid.query.QueryWatcher;
@ -92,16 +91,6 @@ public class GroupByStrategyV1 implements GroupByStrategy
return true; return true;
} }
@Override
public QueryRunner<ResultRow> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
final QueryRunner<ResultRow> runner,
final GroupByQueryQueryToolChest toolChest
)
{
return decorator.decorate(runner, toolChest);
}
@Override @Override
public boolean doMergeResults(final GroupByQuery query) public boolean doMergeResults(final GroupByQuery query)
{ {

View File

@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DataSource; import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException; import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig; import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
@ -58,7 +57,6 @@ import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.ResultRow; import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import org.apache.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
@ -173,21 +171,6 @@ public class GroupByStrategyV2 implements GroupByStrategy
return true; return true;
} }
@Override
public QueryRunner<ResultRow> createIntervalChunkingRunner(
final IntervalChunkingQueryRunnerDecorator decorator,
final QueryRunner<ResultRow> runner,
final GroupByQueryQueryToolChest toolChest
)
{
// No chunkPeriod-based interval chunking for groupBy v2.
// 1) It concats query chunks for consecutive intervals, which won't generate correct results.
// 2) Merging instead of concating isn't a good idea, since it requires all chunks to run simultaneously,
// which may take more resources than the cluster has.
// See also https://github.com/apache/druid/pull/4004
return runner;
}
@Override @Override
public Comparator<ResultRow> createResultComparator(Query<ResultRow> queryParam) public Comparator<ResultRow> createResultComparator(Query<ResultRow> queryParam)
{ {

View File

@ -26,7 +26,6 @@ import org.apache.druid.query.BitmapResultFactory;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.filter.Filter; import org.apache.druid.query.filter.Filter;
import org.joda.time.Interval;
import java.util.List; import java.util.List;
@ -136,12 +135,6 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
delegateQueryMetrics.segment(segmentIdentifier); delegateQueryMetrics.segment(segmentIdentifier);
} }
@Override
public void chunkInterval(Interval interval)
{
delegateQueryMetrics.chunkInterval(interval);
}
@Override @Override
public void preFilters(List<Filter> preFilters) public void preFilters(List<Filter> preFilters)
{ {
@ -208,12 +201,6 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
return delegateQueryMetrics.reportSegmentAndCacheTime(timeNs); return delegateQueryMetrics.reportSegmentAndCacheTime(timeNs);
} }
@Override
public QueryMetrics reportIntervalChunkTime(long timeNs)
{
return delegateQueryMetrics.reportIntervalChunkTime(timeNs);
}
@Override @Override
public QueryMetrics reportCpuTime(long timeNs) public QueryMetrics reportCpuTime(long timeNs)
{ {

View File

@ -34,7 +34,6 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -57,6 +56,7 @@ import java.util.Map;
import java.util.function.BinaryOperator; import java.util.function.BinaryOperator;
/** /**
*
*/ */
public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery> public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResultValue>, SearchQuery>
{ {
@ -69,28 +69,21 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}; };
private final SearchQueryConfig config; private final SearchQueryConfig config;
@Deprecated
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final SearchQueryMetricsFactory queryMetricsFactory; private final SearchQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting @VisibleForTesting
public SearchQueryQueryToolChest( public SearchQueryQueryToolChest(SearchQueryConfig config)
SearchQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{ {
this(config, intervalChunkingQueryRunnerDecorator, DefaultSearchQueryMetricsFactory.instance()); this(config, DefaultSearchQueryMetricsFactory.instance());
} }
@Inject @Inject
public SearchQueryQueryToolChest( public SearchQueryQueryToolChest(
SearchQueryConfig config, SearchQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
SearchQueryMetricsFactory queryMetricsFactory SearchQueryMetricsFactory queryMetricsFactory
) )
{ {
this.config = config; this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory; this.queryMetricsFactory = queryMetricsFactory;
} }
@ -140,8 +133,10 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
{ {
private final List<DimensionSpec> dimensionSpecs = private final List<DimensionSpec> dimensionSpecs =
query.getDimensions() != null ? query.getDimensions() : Collections.emptyList(); query.getDimensions() != null ? query.getDimensions() : Collections.emptyList();
private final List<String> dimOutputNames = dimensionSpecs.size() > 0 ? private final List<String> dimOutputNames = dimensionSpecs.size() > 0
Lists.transform(dimensionSpecs, DimensionSpec::getOutputName) : Collections.emptyList(); ?
Lists.transform(dimensionSpecs, DimensionSpec::getOutputName)
: Collections.emptyList();
@Override @Override
public boolean isCacheable(SearchQuery query, boolean willMergeRunners) public boolean isCacheable(SearchQuery query, boolean willMergeRunners)
@ -230,8 +225,10 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
final Map<String, String> outputNameMap = new HashMap<>(); final Map<String, String> outputNameMap = new HashMap<>();
if (hasOutputName(result)) { if (hasOutputName(result)) {
List<String> cachedOutputNames = (List) result.get(2); List<String> cachedOutputNames = (List) result.get(2);
Preconditions.checkArgument(cachedOutputNames.size() == dimOutputNames.size(), Preconditions.checkArgument(
"cache hit, but number of dimensions mismatch"); cachedOutputNames.size() == dimOutputNames.size(),
"cache hit, but number of dimensions mismatch"
);
needsRename = false; needsRename = false;
for (int idx = 0; idx < cachedOutputNames.size(); idx++) { for (int idx = 0; idx < cachedOutputNames.size(); idx++) {
String cachedOutputName = cachedOutputNames.get(idx); String cachedOutputName = cachedOutputNames.get(idx);
@ -324,25 +321,14 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(final QueryRunner<Result<SearchResultValue>> runner) public QueryRunner<Result<SearchResultValue>> preMergeQueryDecoration(final QueryRunner<Result<SearchResultValue>> runner)
{ {
return new SearchThresholdAdjustingQueryRunner( return new SearchThresholdAdjustingQueryRunner(
intervalChunkingQueryRunnerDecorator.decorate( (queryPlus, responseContext) -> {
new QueryRunner<Result<SearchResultValue>>()
{
@Override
public Sequence<Result<SearchResultValue>> run(
QueryPlus<Result<SearchResultValue>> queryPlus,
ResponseContext responseContext
)
{
SearchQuery searchQuery = (SearchQuery) queryPlus.getQuery(); SearchQuery searchQuery = (SearchQuery) queryPlus.getQuery();
if (searchQuery.getDimensionsFilter() != null) { if (searchQuery.getDimensionsFilter() != null) {
searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize()); searchQuery = searchQuery.withDimFilter(searchQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(searchQuery); queryPlus = queryPlus.withQuery(searchQuery);
} }
return runner.run(queryPlus, responseContext); return runner.run(queryPlus, responseContext);
}
}, },
this
),
config config
); );
} }

View File

@ -35,7 +35,6 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
@ -77,23 +76,17 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
{ {
}; };
@Deprecated
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final TimeseriesQueryMetricsFactory queryMetricsFactory; private final TimeseriesQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting @VisibleForTesting
public TimeseriesQueryQueryToolChest(IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator) public TimeseriesQueryQueryToolChest()
{ {
this(intervalChunkingQueryRunnerDecorator, DefaultTimeseriesQueryMetricsFactory.instance()); this(DefaultTimeseriesQueryMetricsFactory.instance());
} }
@Inject @Inject
public TimeseriesQueryQueryToolChest( public TimeseriesQueryQueryToolChest(TimeseriesQueryMetricsFactory queryMetricsFactory)
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
TimeseriesQueryMetricsFactory queryMetricsFactory
)
{ {
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory; this.queryMetricsFactory = queryMetricsFactory;
} }
@ -375,15 +368,14 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override @Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TimeseriesResultValue>> runner) public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TimeseriesResultValue>> runner)
{ {
return intervalChunkingQueryRunnerDecorator.decorate( return (queryPlus, responseContext) -> {
(queryPlus, responseContext) -> {
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery(); TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery();
if (timeseriesQuery.getDimensionsFilter() != null) { if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize()); timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(timeseriesQuery); queryPlus = queryPlus.withQuery(timeseriesQuery);
} }
return runner.run(queryPlus, responseContext); return runner.run(queryPlus, responseContext);
}, this); };
} }
@Override @Override

View File

@ -32,7 +32,6 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.BySegmentResultValue; import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.CacheStrategy; import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -73,28 +72,21 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}; };
private final TopNQueryConfig config; private final TopNQueryConfig config;
@Deprecated
private final IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator;
private final TopNQueryMetricsFactory queryMetricsFactory; private final TopNQueryMetricsFactory queryMetricsFactory;
@VisibleForTesting @VisibleForTesting
public TopNQueryQueryToolChest( public TopNQueryQueryToolChest(TopNQueryConfig config)
TopNQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator
)
{ {
this(config, intervalChunkingQueryRunnerDecorator, DefaultTopNQueryMetricsFactory.instance()); this(config, DefaultTopNQueryMetricsFactory.instance());
} }
@Inject @Inject
public TopNQueryQueryToolChest( public TopNQueryQueryToolChest(
TopNQueryConfig config, TopNQueryConfig config,
IntervalChunkingQueryRunnerDecorator intervalChunkingQueryRunnerDecorator,
TopNQueryMetricsFactory queryMetricsFactory TopNQueryMetricsFactory queryMetricsFactory
) )
{ {
this.config = config; this.config = config;
this.intervalChunkingQueryRunnerDecorator = intervalChunkingQueryRunnerDecorator;
this.queryMetricsFactory = queryMetricsFactory; this.queryMetricsFactory = queryMetricsFactory;
} }
@ -431,15 +423,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override @Override
public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TopNResultValue>> runner) public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TopNResultValue>> runner)
{ {
return intervalChunkingQueryRunnerDecorator.decorate( return (queryPlus, responseContext) -> {
new QueryRunner<Result<TopNResultValue>>()
{
@Override
public Sequence<Result<TopNResultValue>> run(
QueryPlus<Result<TopNResultValue>> queryPlus,
ResponseContext responseContext
)
{
TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery(); TopNQuery topNQuery = (TopNQuery) queryPlus.getQuery();
if (topNQuery.getDimensionsFilter() != null) { if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize()); topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
@ -459,10 +443,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
} else { } else {
return runner.run(queryPlus.withQuery(delegateTopNQuery), responseContext); return runner.run(queryPlus.withQuery(delegateTopNQuery), responseContext);
} }
} };
},
this
);
} }
@Override @Override

View File

@ -120,11 +120,6 @@ public class DefaultQueryMetricsTest
Assert.assertEquals("query/segmentAndCache/time", actualEvent.get("metric")); Assert.assertEquals("query/segmentAndCache/time", actualEvent.get("metric"));
Assert.assertEquals(4L, actualEvent.get("value")); Assert.assertEquals(4L, actualEvent.get("value"));
queryMetrics.reportIntervalChunkTime(5000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/intervalChunk/time", actualEvent.get("metric"));
Assert.assertEquals(5L, actualEvent.get("value"));
queryMetrics.reportCpuTime(6000001).emit(serviceEmitter); queryMetrics.reportCpuTime(6000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/cpu/time", actualEvent.get("metric")); Assert.assertEquals("query/cpu/time", actualEvent.get("metric"));

View File

@ -1,110 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids.TimeseriesQueryBuilder;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
public class IntervalChunkingQueryRunnerTest
{
private IntervalChunkingQueryRunnerDecorator decorator;
private ExecutorService executors;
private QueryRunner baseRunner;
private QueryToolChest toolChest;
private final TimeseriesQueryBuilder queryBuilder;
public IntervalChunkingQueryRunnerTest()
{
queryBuilder = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")));
}
@Before
public void setup()
{
executors = EasyMock.createMock(ExecutorService.class);
ServiceEmitter emitter = EasyMock.createNiceMock(ServiceEmitter.class);
decorator = new IntervalChunkingQueryRunnerDecorator(executors,
QueryRunnerTestHelper.NOOP_QUERYWATCHER, emitter);
baseRunner = EasyMock.createMock(QueryRunner.class);
toolChest = EasyMock.createNiceMock(QueryToolChest.class);
}
@Test
public void testDefaultNoChunking()
{
QueryPlus queryPlus = QueryPlus.wrap(queryBuilder.intervals("2014/2016").build());
final ResponseContext context = ResponseContext.createEmpty();
EasyMock.expect(baseRunner.run(queryPlus, context)).andReturn(Sequences.empty());
EasyMock.replay(baseRunner);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(queryPlus, context);
EasyMock.verify(baseRunner);
}
@Test
public void testChunking()
{
Query query = queryBuilder.intervals("2015-01-01T00:00:00.000/2015-01-11T00:00:00.000").context(ImmutableMap.of("chunkPeriod", "P1D")).build();
executors.execute(EasyMock.anyObject(Runnable.class));
EasyMock.expectLastCall().times(10);
EasyMock.replay(executors);
EasyMock.replay(toolChest);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
EasyMock.verify(executors);
}
@Test
public void testChunkingOnMonths()
{
Query query = queryBuilder.intervals("2015-01-01T00:00:00.000/2015-02-11T00:00:00.000").context(ImmutableMap.of("chunkPeriod", "P1M")).build();
executors.execute(EasyMock.anyObject(Runnable.class));
EasyMock.expectLastCall().times(2);
EasyMock.replay(executors);
EasyMock.replay(toolChest);
QueryRunner runner = decorator.decorate(baseRunner, toolChest);
runner.run(QueryPlus.wrap(query), ResponseContext.createEmpty());
EasyMock.verify(executors);
}
}

View File

@ -1023,10 +1023,7 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) { try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
QueryRunnerFactory factory = new TopNQueryRunnerFactory( QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool, pool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner( QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
@ -1076,10 +1073,7 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) { try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
QueryRunnerFactory factory = new TopNQueryRunnerFactory( QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool, pool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner( QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(
@ -1136,10 +1130,7 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) { try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
QueryRunnerFactory factory = new TopNQueryRunnerFactory( QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool, pool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner( QueryRunner<Result<TopNResultValue>> runner = QueryRunnerTestHelper.makeQueryRunner(

View File

@ -26,14 +26,11 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.js.JavaScriptConfig; import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -92,7 +89,9 @@ public class QueryRunnerTestHelper
public static final Interval FULL_ON_INTERVAL = Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z"); public static final Interval FULL_ON_INTERVAL = Intervals.of("1970-01-01T00:00:00.000Z/2020-01-01T00:00:00.000Z");
public static final SegmentId SEGMENT_ID = SegmentId.of(DATA_SOURCE, FULL_ON_INTERVAL, "dummy_version", 0); public static final SegmentId SEGMENT_ID = SegmentId.of(DATA_SOURCE, FULL_ON_INTERVAL, "dummy_version", 0);
public static final UnionDataSource UNION_DATA_SOURCE = new UnionDataSource( public static final UnionDataSource UNION_DATA_SOURCE = new UnionDataSource(
Stream.of(DATA_SOURCE, DATA_SOURCE, DATA_SOURCE, DATA_SOURCE).map(TableDataSource::new).collect(Collectors.toList()) Stream.of(DATA_SOURCE, DATA_SOURCE, DATA_SOURCE, DATA_SOURCE)
.map(TableDataSource::new)
.collect(Collectors.toList())
); );
public static final Granularity DAY_GRAN = Granularities.DAY; public static final Granularity DAY_GRAN = Granularities.DAY;
@ -118,7 +117,10 @@ public class QueryRunnerTestHelper
public static final CountAggregatorFactory ROWS_COUNT = new CountAggregatorFactory("rows"); public static final CountAggregatorFactory ROWS_COUNT = new CountAggregatorFactory("rows");
public static final LongSumAggregatorFactory INDEX_LONG_SUM = new LongSumAggregatorFactory("index", INDEX_METRIC); public static final LongSumAggregatorFactory INDEX_LONG_SUM = new LongSumAggregatorFactory("index", INDEX_METRIC);
public static final LongSumAggregatorFactory TIME_LONG_SUM = new LongSumAggregatorFactory("sumtime", TIME_DIMENSION); public static final LongSumAggregatorFactory TIME_LONG_SUM = new LongSumAggregatorFactory("sumtime", TIME_DIMENSION);
public static final DoubleSumAggregatorFactory INDEX_DOUBLE_SUM = new DoubleSumAggregatorFactory("index", INDEX_METRIC); public static final DoubleSumAggregatorFactory INDEX_DOUBLE_SUM = new DoubleSumAggregatorFactory(
"index",
INDEX_METRIC
);
public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }"; public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
public static final String JS_RESET_0 = "function reset() { return 0; }"; public static final String JS_RESET_0 = "function reset() { return 0; }";
public static final JavaScriptAggregatorFactory JS_INDEX_SUM_IF_PLACEMENTISH_A = new JavaScriptAggregatorFactory( public static final JavaScriptAggregatorFactory JS_INDEX_SUM_IF_PLACEMENTISH_A = new JavaScriptAggregatorFactory(
@ -464,7 +466,10 @@ public class QueryRunnerTestHelper
segments.addAll(timeline.lookup(interval)); segments.addAll(timeline.lookup(interval));
} }
List<Sequence<T>> sequences = new ArrayList<>(); List<Sequence<T>> sequences = new ArrayList<>();
for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : toolChest.filterSegments(query, segments)) { for (TimelineObjectHolder<String, ReferenceCountingSegment> holder : toolChest.filterSegments(
query,
segments
)) {
Segment segment = holder.getObject().getChunk(0).getObject(); Segment segment = holder.getObject().getChunk(0).getObject();
QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec( QueryPlus queryPlusRunning = queryPlus.withQuerySegmentSpec(
new SpecificSegmentSpec( new SpecificSegmentSpec(
@ -486,37 +491,6 @@ public class QueryRunnerTestHelper
.applyPostMergeDecoration(); .applyPostMergeDecoration();
} }
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(
final QueryRunner<T> delegate,
QueryToolChest<T, ? extends Query<T>> toolChest
)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}
};
}
};
}
public static IntervalChunkingQueryRunnerDecorator sameThreadIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(
Execs.directExecutor(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new ServiceEmitter("dummy", "dummy", new NoopEmitter())
);
}
public static Map<String, Object> of(Object... keyvalues) public static Map<String, Object> of(Object... keyvalues)
{ {
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder(); ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
@ -529,7 +503,7 @@ public class QueryRunnerTestHelper
public static TimeseriesQueryRunnerFactory newTimeseriesQueryRunnerFactory() public static TimeseriesQueryRunnerFactory newTimeseriesQueryRunnerFactory()
{ {
return new TimeseriesQueryRunnerFactory( return new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -55,10 +55,7 @@ public class TestQueryRunners
{ {
QueryRunnerFactory factory = new TopNQueryRunnerFactory( QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool, pool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(TOPN_CONFIG),
TOPN_CONFIG,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
return new FinalizeResultsQueryRunner<T>( return new FinalizeResultsQueryRunner<T>(
@ -70,8 +67,7 @@ public class TestQueryRunners
public static <T> QueryRunner<T> makeTimeSeriesQueryRunner(Segment adapter) public static <T> QueryRunner<T> makeTimeSeriesQueryRunner(Segment adapter)
{ {
QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -87,10 +83,7 @@ public class TestQueryRunners
final SearchQueryConfig config = new SearchQueryConfig(); final SearchQueryConfig config = new SearchQueryConfig();
QueryRunnerFactory factory = new SearchQueryRunnerFactory( QueryRunnerFactory factory = new SearchQueryRunnerFactory(
new SearchStrategySelector(Suppliers.ofInstance(config)), new SearchStrategySelector(Suppliers.ofInstance(config)),
new SearchQueryQueryToolChest( new SearchQueryQueryToolChest(config),
config,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
return new FinalizeResultsQueryRunner<T>( return new FinalizeResultsQueryRunner<T>(

View File

@ -177,9 +177,7 @@ public class AggregationTestHelper implements Closeable
{ {
ObjectMapper mapper = TestHelper.makeJsonMapper(); ObjectMapper mapper = TestHelper.makeJsonMapper();
TimeseriesQueryQueryToolChest toolchest = new TimeseriesQueryQueryToolChest( TimeseriesQueryQueryToolChest toolchest = new TimeseriesQueryQueryToolChest();
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
toolchest, toolchest,
@ -218,10 +216,7 @@ public class AggregationTestHelper implements Closeable
{ {
ObjectMapper mapper = TestHelper.makeJsonMapper(); ObjectMapper mapper = TestHelper.makeJsonMapper();
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest( TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
final CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>( final CloseableStupidPool<ByteBuffer> pool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool", "TopNQueryRunnerFactory-bufferPool",

View File

@ -47,7 +47,6 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig; import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -396,18 +395,12 @@ public class GroupByLimitPushDownInsufficientBufferTest
groupByFactory = new GroupByQueryRunnerFactory( groupByFactory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
tooSmallGroupByFactory = new GroupByQueryRunnerFactory( tooSmallGroupByFactory = new GroupByQueryRunnerFactory(
tooSmallStrategySelector, tooSmallStrategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(tooSmallStrategySelector)
tooSmallStrategySelector,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
} }
@ -688,23 +681,4 @@ public class GroupByLimitPushDownInsufficientBufferTest
} }
}; };
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}
};
}
};
}
} }

View File

@ -26,7 +26,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.CloseableDefaultBlockingPool; import org.apache.druid.collections.CloseableDefaultBlockingPool;
import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
@ -48,7 +47,6 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig; import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -429,18 +427,12 @@ public class GroupByLimitPushDownMultiNodeMergeTest
groupByFactory = new GroupByQueryRunnerFactory( groupByFactory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
groupByFactory2 = new GroupByQueryRunnerFactory( groupByFactory2 = new GroupByQueryRunnerFactory(
strategySelector2, strategySelector2,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector2)
strategySelector2,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
} }
@ -784,31 +776,5 @@ public class GroupByLimitPushDownMultiNodeMergeTest
); );
} }
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> {};
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}
};
}
};
}
} }

View File

@ -44,7 +44,6 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig; import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -286,10 +285,7 @@ public class GroupByMultiSegmentTest
groupByFactory = new GroupByQueryRunnerFactory( groupByFactory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
} }
@ -420,23 +416,4 @@ public class GroupByMultiSegmentTest
} }
}; };
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return delegate.run(queryPlus, responseContext);
}
};
}
};
}
} }

View File

@ -156,10 +156,7 @@ public class GroupByQueryMergeBufferTest extends InitializedNullHandlingTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
); );
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector);
strategySelector,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(strategySelector, toolChest); return new GroupByQueryRunnerFactory(strategySelector, toolChest);
} }

View File

@ -113,13 +113,11 @@ public class GroupByQueryQueryToolChestTest
.build(); .build();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query1); ).getCacheStrategy(query1);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
@ -177,13 +175,11 @@ public class GroupByQueryQueryToolChestTest
.build(); .build();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query1); ).getCacheStrategy(query1);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
@ -243,13 +239,11 @@ public class GroupByQueryQueryToolChestTest
.build(); .build();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query1); ).getCacheStrategy(query1);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
@ -331,13 +325,11 @@ public class GroupByQueryQueryToolChestTest
.build(); .build();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query1); ).getCacheStrategy(query1);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
@ -426,13 +418,11 @@ public class GroupByQueryQueryToolChestTest
.build(); .build();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query1); ).getCacheStrategy(query1);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
@ -493,13 +483,11 @@ public class GroupByQueryQueryToolChestTest
.build(); .build();
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy1 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query1); ).getCacheStrategy(query1);
final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest( final CacheStrategy<ResultRow, Object, GroupByQuery> strategy2 = new GroupByQueryQueryToolChest(
null, null
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
).getCacheStrategy(query2); ).getCacheStrategy(query2);
Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2))); Assert.assertTrue(Arrays.equals(strategy1.computeCacheKey(query1), strategy2.computeCacheKey(query2)));
@ -584,10 +572,7 @@ public class GroupByQueryQueryToolChestTest
.setGranularity(QueryRunnerTestHelper.DAY_GRAN) .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build(); .build();
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(null);
null,
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
);
final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper( final ObjectMapper arraysObjectMapper = toolChest.decorateObjectMapper(

View File

@ -118,10 +118,7 @@ public class GroupByQueryRunnerFailureTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
); );
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector);
strategySelector,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(strategySelector, toolChest); return new GroupByQueryRunnerFactory(strategySelector, toolChest);
} }

View File

@ -402,10 +402,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
); );
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest( final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(strategySelector);
strategySelector,
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
);
final Closer closer = Closer.create(); final Closer closer = Closer.create();
closer.register(bufferPool); closer.register(bufferPool);
closer.register(mergeBufferPool); closer.register(mergeBufferPool);
@ -1177,34 +1174,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(expectedResults, results, "sort-by-dimensions-first"); TestHelper.assertExpectedObjects(expectedResults, results, "sort-by-dimensions-first");
} }
@Test
public void testGroupByWithChunkPeriod()
{
GroupByQuery query = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
.setGranularity(QueryRunnerTestHelper.ALL_GRAN)
.overrideContext(ImmutableMap.of("chunkPeriod", "P1D"))
.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(query, "2011-04-01", "alias", "automotive", "rows", 2L, "idx", 282L),
makeRow(query, "2011-04-01", "alias", "business", "rows", 2L, "idx", 230L),
makeRow(query, "2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 324L),
makeRow(query, "2011-04-01", "alias", "health", "rows", 2L, "idx", 233L),
makeRow(query, "2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 5317L),
makeRow(query, "2011-04-01", "alias", "news", "rows", 2L, "idx", 235L),
makeRow(query, "2011-04-01", "alias", "premium", "rows", 6L, "idx", 5405L),
makeRow(query, "2011-04-01", "alias", "technology", "rows", 2L, "idx", 175L),
makeRow(query, "2011-04-01", "alias", "travel", "rows", 2L, "idx", 245L)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "chunk-period");
}
@Test @Test
public void testGroupByNoAggregators() public void testGroupByNoAggregators()
{ {
@ -4732,70 +4701,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedObjects(expectedResults, results, "subquery-multiple-intervals"); TestHelper.assertExpectedObjects(expectedResults, results, "subquery-multiple-intervals");
} }
@Test
public void testSubqueryWithMultipleIntervalsInOuterQueryAndChunkPeriod()
{
GroupByQuery subquery = makeQueryBuilder()
.setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
.setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
.setDimensions(new DefaultDimensionSpec("quality", "alias"))
.setDimFilter(new JavaScriptDimFilter(
"quality",
"function(dim){ return true; }",
null,
JavaScriptConfig.getEnabledInstance()
))
.setAggregatorSpecs(
QueryRunnerTestHelper.ROWS_COUNT,
new LongSumAggregatorFactory("idx", "index"),
new LongSumAggregatorFactory("indexMaxPlusTen", "indexMaxPlusTen")
)
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.overrideContext(ImmutableMap.of("chunkPeriod", "P1D"))
.build();
GroupByQuery query = makeQueryBuilder()
.setDataSource(subquery)
.setQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
ImmutableList.of(
Intervals.of("2011-04-01T00:00:00.000Z/2011-04-01T23:58:00.000Z"),
Intervals.of("2011-04-02T00:00:00.000Z/2011-04-03T00:00:00.000Z")
)
)
)
.setDimensions(new DefaultDimensionSpec("alias", "alias"))
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"), new LongSumAggregatorFactory("idx", "idx"))
.setGranularity(QueryRunnerTestHelper.DAY_GRAN)
.build();
List<ResultRow> expectedResults = Arrays.asList(
makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
makeRow(query, "2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
makeRow(query, "2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
makeRow(query, "2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
makeRow(query, "2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
makeRow(query, "2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
makeRow(query, "2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
makeRow(query, "2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
makeRow(query, "2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
makeRow(query, "2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
makeRow(query, "2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
makeRow(query, "2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
makeRow(query, "2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
makeRow(query, "2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
makeRow(query, "2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
makeRow(query, "2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
// Subqueries are handled by the ToolChest
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "subquery-multiple-intervals");
}
@Test @Test
public void testSubqueryWithExtractionFnInOuterQuery() public void testSubqueryWithExtractionFnInOuterQuery()
{ {

View File

@ -26,7 +26,6 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers; import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.NonBlockingPool;
@ -49,7 +48,6 @@ import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner; import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner; import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig; import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
@ -357,18 +355,12 @@ public class NestedQueryPushDownTest
groupByFactory = new GroupByQueryRunnerFactory( groupByFactory = new GroupByQueryRunnerFactory(
strategySelector, strategySelector,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector)
strategySelector,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
groupByFactory2 = new GroupByQueryRunnerFactory( groupByFactory2 = new GroupByQueryRunnerFactory(
strategySelector2, strategySelector2,
new GroupByQueryQueryToolChest( new GroupByQueryQueryToolChest(strategySelector2)
strategySelector2,
noopIntervalChunkingQueryRunnerDecorator()
)
); );
} }
@ -925,24 +917,5 @@ public class NestedQueryPushDownTest
); );
} }
public static final QueryWatcher NOOP_QUERYWATCHER = new QueryWatcher() public static final QueryWatcher NOOP_QUERYWATCHER = (query, future) -> {};
{
@Override
public void registerQuery(Query query, ListenableFuture future)
{
}
};
public static IntervalChunkingQueryRunnerDecorator noopIntervalChunkingQueryRunnerDecorator()
{
return new IntervalChunkingQueryRunnerDecorator(null, null, null)
{
@Override
public <T> QueryRunner<T> decorate(final QueryRunner<T> delegate, QueryToolChest<T, ? extends Query<T>> toolChest)
{
return (queryPlus, responseContext) -> delegate.run(queryPlus, responseContext);
}
};
}
} }

View File

@ -78,10 +78,7 @@ public class SearchQueryRunnerTest extends InitializedNullHandlingTest
{ {
private static final Logger LOG = new Logger(SearchQueryRunnerTest.class); private static final Logger LOG = new Logger(SearchQueryRunnerTest.class);
private static final SearchQueryConfig CONFIG = new SearchQueryConfig(); private static final SearchQueryConfig CONFIG = new SearchQueryConfig();
private static final SearchQueryQueryToolChest TOOL_CHEST = new SearchQueryQueryToolChest( private static final SearchQueryQueryToolChest TOOL_CHEST = new SearchQueryQueryToolChest(CONFIG);
CONFIG,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
private static final SearchStrategySelector SELECTOR = new SearchStrategySelector(Suppliers.ofInstance(CONFIG)); private static final SearchStrategySelector SELECTOR = new SearchStrategySelector(Suppliers.ofInstance(CONFIG));
@Parameterized.Parameters(name = "{0}") @Parameterized.Parameters(name = "{0}")

View File

@ -115,10 +115,7 @@ public class SearchQueryRunnerWithCaseTest extends InitializedNullHandlingTest
{ {
return new SearchQueryRunnerFactory( return new SearchQueryRunnerFactory(
new SearchStrategySelector(Suppliers.ofInstance(config)), new SearchStrategySelector(Suppliers.ofInstance(config)),
new SearchQueryQueryToolChest( new SearchQueryQueryToolChest(config),
config,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
} }

View File

@ -61,7 +61,7 @@ public class TimeSeriesUnionQueryRunnerTest
return QueryRunnerTestHelper.cartesian( return QueryRunnerTestHelper.cartesian(
QueryRunnerTestHelper.makeUnionQueryRunners( QueryRunnerTestHelper.makeUnionQueryRunners(
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
@ -143,7 +143,7 @@ public class TimeSeriesUnionQueryRunnerTest
) )
.descending(descending) .descending(descending)
.build(); .build();
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()); QueryToolChest toolChest = new TimeseriesQueryQueryToolChest();
final List<Result<TimeseriesResultValue>> ds1 = Lists.newArrayList( final List<Result<TimeseriesResultValue>> ds1 = Lists.newArrayList(
new Result<>( new Result<>(
DateTimes.of("2011-04-02"), DateTimes.of("2011-04-02"),

View File

@ -111,7 +111,7 @@ public class TimeseriesQueryRunnerBonusTest
private List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index) private List<Result<TimeseriesResultValue>> runTimeseriesCount(IncrementalIndex index)
{ {
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -97,9 +97,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
// runners // runners
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
@ -544,9 +542,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
); );
// Must create a toolChest so we can run mergeResults (which applies grand totals). // Must create a toolChest so we can run mergeResults (which applies grand totals).
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest( QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest();
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
// Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called. // Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called.
final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner( final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner(
@ -595,9 +591,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
); );
// Must create a toolChest so we can run mergeResults (which creates the zeroed-out row). // Must create a toolChest so we can run mergeResults (which creates the zeroed-out row).
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest( QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest();
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
// Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called. // Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called.
final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner( final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner(
@ -2533,9 +2527,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList(); Iterable<Result<TimeseriesResultValue>> results = runner.run(QueryPlus.wrap(query)).toList();
TestHelper.assertExpectedResults(expectedResults, results); TestHelper.assertExpectedResults(expectedResults, results);
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest( QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest();
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
QueryRunner<Result<TimeseriesResultValue>> optimizedRunner = toolChest.postMergeQueryDecoration( QueryRunner<Result<TimeseriesResultValue>> optimizedRunner = toolChest.postMergeQueryDecoration(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner))); toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)));
Iterable<Result<TimeseriesResultValue>> results2 = new FinalizeResultsQueryRunner(optimizedRunner, toolChest) Iterable<Result<TimeseriesResultValue>> results2 = new FinalizeResultsQueryRunner(optimizedRunner, toolChest)
@ -2564,9 +2556,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
.build(); .build();
// Must create a toolChest so we can run mergeResults. // Must create a toolChest so we can run mergeResults.
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest( QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest();
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
// Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called. // Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called.
final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner( final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner(

View File

@ -257,10 +257,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
public void testMinTopNThreshold() public void testMinTopNThreshold()
{ {
TopNQueryConfig config = new TopNQueryConfig(); TopNQueryConfig config = new TopNQueryConfig();
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest( final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(config);
config,
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) { try (CloseableStupidPool<ByteBuffer> pool = TestQueryRunners.createDefaultNonBlockingPool()) {
QueryRunnerFactory factory = new TopNQueryRunnerFactory( QueryRunnerFactory factory = new TopNQueryRunnerFactory(
pool, pool,

View File

@ -96,7 +96,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
} }
} }
), ),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
TEST_CASE_MAP.put( TEST_CASE_MAP.put(

View File

@ -161,10 +161,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
defaultPool, defaultPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )
@ -173,10 +170,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
QueryRunnerTestHelper.makeQueryRunners( QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
customPool, customPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )
@ -259,10 +253,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query, ResponseContext context) private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query, ResponseContext context)
{ {
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest( final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(new TopNQueryConfig());
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
final QueryRunner<Result<TopNResultValue>> mergeRunner = new FinalizeResultsQueryRunner( final QueryRunner<Result<TopNResultValue>> mergeRunner = new FinalizeResultsQueryRunner(
chest.mergeResults(runner), chest.mergeResults(runner),
chest chest
@ -921,118 +912,6 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
assertExpectedResults(expectedResults, query); assertExpectedResults(expectedResults, query);
} }
@Test
public void testTopNOverFirstLastAggregatorChunkPeriod()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.MONTH_GRAN)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric("last")
.threshold(3)
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.aggregators(
new LongFirstAggregatorFactory("first", "index"),
new LongLastAggregatorFactory("last", "index")
)
.context(ImmutableMap.of("chunkPeriod", "P1D"))
.build();
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-01-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1000L)
.put("last", 1127L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 800L)
.put("last", 943L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 100L)
.put("last", 155L)
.build()
)
)
),
new Result<>(
DateTimes.of("2011-02-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1203L)
.put("last", 1292L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 1667L)
.put("last", 1101L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 132L)
.put("last", 114L)
.build()
)
)
),
new Result<>(
DateTimes.of("2011-03-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1124L)
.put("last", 1366L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 1166L)
.put("last", 1063L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 153L)
.put("last", 125L)
.build()
)
)
),
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
.put("first", 1314L)
.put("last", 1029L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "upfront")
.put("first", 1447L)
.put("last", 780L)
.build(),
ImmutableMap.<String, Object>builder()
.put("market", "spot")
.put("first", 135L)
.put("last", 120L)
.build()
)
)
)
);
final Sequence<Result<TopNResultValue>> retval = runWithPreMergeAndMerge(query);
TestHelper.assertExpectedResults(expectedResults, retval);
}
@Test @Test
public void testTopNOverFirstLastFloatAggregatorUsingDoubleColumn() public void testTopNOverFirstLastFloatAggregatorUsingDoubleColumn()
{ {

View File

@ -70,20 +70,14 @@ public class TopNUnionQueryTest
QueryRunnerTestHelper.makeUnionQueryRunners( QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
defaultPool, defaultPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
), ),
QueryRunnerTestHelper.makeUnionQueryRunners( QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
customPool, customPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )

View File

@ -571,9 +571,7 @@ public class IndexMergerV9WithSpatialIndexTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -625,9 +623,7 @@ public class IndexMergerV9WithSpatialIndexTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -714,9 +710,7 @@ public class IndexMergerV9WithSpatialIndexTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -431,7 +431,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null); final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -528,7 +528,7 @@ public class IncrementalIndexTest extends InitializedNullHandlingTest
final List<ListenableFuture<?>> queryFutures = Lists.newArrayListWithExpectedSize(concurrentThreads); final List<ListenableFuture<?>> queryFutures = Lists.newArrayListWithExpectedSize(concurrentThreads);
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null); final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -505,8 +505,7 @@ public class SpatialFilterBonusTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -592,8 +591,7 @@ public class SpatialFilterBonusTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -685,8 +683,7 @@ public class SpatialFilterBonusTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -561,8 +561,7 @@ public class SpatialFilterTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -614,8 +613,7 @@ public class SpatialFilterTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );
@ -702,8 +700,7 @@ public class SpatialFilterTest
); );
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -344,7 +344,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
final List<ListenableFuture<?>> queryFutures = new ArrayList<>(); final List<ListenableFuture<?>> queryFutures = new ArrayList<>();
final Segment incrementalIndexSegment = new IncrementalIndexSegment(incrementalIndex, null); final Segment incrementalIndexSegment = new IncrementalIndexSegment(incrementalIndex, null);
final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory( final QueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
); );

View File

@ -75,7 +75,6 @@ import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result; import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.SegmentDescriptor;
@ -429,11 +428,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching( testQueryCaching(
runner, runner,
@ -468,11 +463,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching( testQueryCaching(
runner, runner,
@ -594,10 +585,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
getDefaultQueryRunner(),
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator())
);
testQueryCaching( testQueryCaching(
runner, runner,
@ -655,11 +643,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);
testQueryCaching( testQueryCaching(
runner, runner,
@ -699,10 +683,9 @@ public class CachingClusteredClientTest
.aggregators(AGGS) .aggregators(AGGS)
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator())
);
testQueryCaching( testQueryCaching(
runner, runner,
1, 1,
@ -775,10 +758,7 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), getDefaultQueryRunner(),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
testQueryCaching( testQueryCaching(
@ -851,10 +831,7 @@ public class CachingClusteredClientTest
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), getDefaultQueryRunner(),
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
testQueryCaching( testQueryCaching(
@ -955,10 +932,8 @@ public class CachingClusteredClientTest
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), new TopNQueryQueryToolChest( getDefaultQueryRunner(),
new TopNQueryConfig(), new TopNQueryQueryToolChest(new TopNQueryConfig())
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
testQueryCaching( testQueryCaching(
runner, runner,
@ -1027,11 +1002,10 @@ public class CachingClusteredClientTest
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), new TopNQueryQueryToolChest( getDefaultQueryRunner(),
new TopNQueryConfig(), new TopNQueryQueryToolChest(new TopNQueryConfig())
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
testQueryCaching( testQueryCaching(
runner, runner,
builder.build(), builder.build(),
@ -1127,11 +1101,10 @@ public class CachingClusteredClientTest
); );
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), new SearchQueryQueryToolChest( getDefaultQueryRunner(),
new SearchQueryConfig(), new SearchQueryQueryToolChest(new SearchQueryConfig())
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
TestHelper.assertExpectedResults( TestHelper.assertExpectedResults(
makeSearchResults( makeSearchResults(
TOP_DIM, TOP_DIM,
@ -1196,11 +1169,10 @@ public class CachingClusteredClientTest
); );
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(), new SearchQueryQueryToolChest( getDefaultQueryRunner(),
new SearchQueryConfig(), new SearchQueryQueryToolChest(new SearchQueryConfig())
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
); );
ResponseContext context = ResponseContext.createEmpty(); ResponseContext context = ResponseContext.createEmpty();
TestHelper.assertExpectedResults( TestHelper.assertExpectedResults(
makeSearchResults( makeSearchResults(
@ -1437,11 +1409,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS) .postAggregators(POST_AGGS)
.context(CONTEXT); .context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);
/* /*
For dim0 (2011-01-01/2011-01-05), the combined range is {[1,1], [222,333]}, so segments [-inf,1], [1,2], [2,3], and For dim0 (2011-01-01/2011-01-05), the combined range is {[1,1], [222,333]}, so segments [-inf,1], [1,2], [2,3], and
@ -1510,11 +1478,7 @@ public class CachingClusteredClientTest
final Interval interval2 = Intervals.of("2011-01-07/2011-01-08"); final Interval interval2 = Intervals.of("2011-01-07/2011-01-08");
final Interval interval3 = Intervals.of("2011-01-08/2011-01-09"); final Interval interval3 = Intervals.of("2011-01-08/2011-01-09");
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest());
getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
);
final DruidServer lastServer = servers[random.nextInt(servers.length)]; final DruidServer lastServer = servers[random.nextInt(servers.length)];
ServerSelector selector1 = makeMockSingleDimensionSelector(lastServer, "dim1", null, "b", 1); ServerSelector selector1 = makeMockSingleDimensionSelector(lastServer, "dim1", null, "b", 1);

View File

@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.MapQueryToolChestWarehouse; import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query; import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse; import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.groupby.GroupByQuery; import org.apache.druid.query.groupby.GroupByQuery;
@ -65,23 +64,15 @@ public final class CachingClusteredClientTestUtils
ImmutableMap.<Class<? extends Query>, QueryToolChest>builder() ImmutableMap.<Class<? extends Query>, QueryToolChest>builder()
.put( .put(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest()
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
) )
.put( .put(
TopNQuery.class, TopNQuery.class,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig())
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
) )
.put( .put(
SearchQuery.class, SearchQuery.class,
new SearchQueryQueryToolChest( new SearchQueryQueryToolChest(new SearchQueryConfig())
new SearchQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
)
) )
.put( .put(
GroupByQuery.class, GroupByQuery.class,

View File

@ -140,10 +140,7 @@ public class CachingQueryRunnerTest
.aggregators(AGGS) .aggregators(AGGS)
.granularity(Granularities.ALL); .granularity(Granularities.ALL);
QueryToolChest toolchest = new TopNQueryQueryToolChest( QueryToolChest toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig());
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
testCloseAndPopulate(expectedRes, expectedCacheRes, builder.build(), toolchest); testCloseAndPopulate(expectedRes, expectedCacheRes, builder.build(), toolchest);
testUseCache(expectedCacheRes, builder.build(), toolchest); testUseCache(expectedCacheRes, builder.build(), toolchest);
@ -189,9 +186,7 @@ public class CachingQueryRunnerTest
expectedResults = Lists.newArrayList(row1, row2); expectedResults = Lists.newArrayList(row1, row2);
} }
QueryToolChest toolChest = new TimeseriesQueryQueryToolChest( QueryToolChest toolChest = new TimeseriesQueryQueryToolChest();
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
);
testCloseAndPopulate(expectedResults, expectedResults, query, toolChest); testCloseAndPopulate(expectedResults, expectedResults, query, toolChest);
testUseCache(expectedResults, query, toolChest); testUseCache(expectedResults, query, toolChest);

View File

@ -37,7 +37,6 @@ import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory; import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.QueryRunnerTestHelper; import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory;
@ -240,13 +239,7 @@ public class AppenderatorTester implements AutoCloseable
new DefaultQueryRunnerFactoryConglomerate( new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.of( ImmutableMap.of(
TimeseriesQuery.class, new TimeseriesQueryRunnerFactory( TimeseriesQuery.class, new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest( new TimeseriesQueryQueryToolChest(),
new IntervalChunkingQueryRunnerDecorator(
queryExecutor,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
emitter
)
),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
), ),

View File

@ -610,7 +610,7 @@ public class CalciteTests
.put( .put(
TimeseriesQuery.class, TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory( new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()), new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(), new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
@ -619,10 +619,7 @@ public class CalciteTests
TopNQuery.class, TopNQuery.class,
new TopNQueryRunnerFactory( new TopNQueryRunnerFactory(
stupidPool, stupidPool,
new TopNQueryQueryToolChest( new TopNQueryQueryToolChest(new TopNQueryConfig()),
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER QueryRunnerTestHelper.NOOP_QUERYWATCHER
) )
) )

View File

@ -1125,7 +1125,6 @@ SysMonitor
TaskCountStatsMonitor TaskCountStatsMonitor
bufferCapacity bufferCapacity
bufferPoolName bufferPoolName
chunkInterval
cms cms
cpuName cpuName
cpuTime cpuTime
@ -1245,7 +1244,6 @@ TimeseriesQuery
D1 D1
D2 D2
D3 D3
chunkPeriod
druid.query.groupBy.defaultStrategy druid.query.groupBy.defaultStrategy
druid.query.groupBy.maxMergingDictionarySize druid.query.groupBy.maxMergingDictionarySize
druid.query.groupBy.maxOnDiskStorage druid.query.groupBy.maxOnDiskStorage
@ -1571,7 +1569,6 @@ allowAll
array_mod array_mod
batch_index_task batch_index_task
cgroup cgroup
chunkPeriod
classloader classloader
com.metamx com.metamx
common.runtime.properties common.runtime.properties