mirror of https://github.com/apache/druid.git
Use GroupBy V2 as default (#3953)
* Use GroupBy V2 as default * Remove unused line * Change assert to exception propagation
This commit is contained in:
parent
361d9d9802
commit
bc33b68b51
|
@ -57,7 +57,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally,
|
||||||
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|
||||||
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|
||||||
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|
||||||
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|
||||||
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|
||||||
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|
||||||
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|
||||||
|
|
|
@ -56,7 +56,7 @@ Druid uses Jetty to serve HTTP requests.
|
||||||
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|
||||||
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|
|`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE|
|
||||||
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|
||||||
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|
||||||
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|
||||||
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|
||||||
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|
||||||
|
|
|
@ -41,7 +41,7 @@ The realtime node uses several of the global configs in [Configuration](../confi
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|
|`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)|
|
||||||
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|
||||||
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0|
|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|
||||||
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|
||||||
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|
||||||
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|
||||||
|
|
|
@ -119,7 +119,7 @@ See [Multi-value dimensions](multi-value-dimensions.html) for more details.
|
||||||
|
|
||||||
GroupBy queries can be executed using two different strategies. The default strategy for a cluster is determined by the
|
GroupBy queries can be executed using two different strategies. The default strategy for a cluster is determined by the
|
||||||
"druid.query.groupBy.defaultStrategy" runtime property on the broker. This can be overridden using "groupByStrategy" in
|
"druid.query.groupBy.defaultStrategy" runtime property on the broker. This can be overridden using "groupByStrategy" in
|
||||||
the query context. If neither the context field nor the property is set, the "v1" strategy will be used.
|
the query context. If neither the context field nor the property is set, the "v2" strategy will be used.
|
||||||
|
|
||||||
- "v1", the default, generates per-segment results on data nodes (historical, realtime, middleManager) using a map which
|
- "v1", the default, generates per-segment results on data nodes (historical, realtime, middleManager) using a map which
|
||||||
is partially on-heap (dimension keys and the map itself) and partially off-heap (the aggregated values). Data nodes then
|
is partially on-heap (dimension keys and the map itself) and partially off-heap (the aggregated values). Data nodes then
|
||||||
|
@ -168,7 +168,7 @@ When using the "v1" strategy, the following runtime properties apply:
|
||||||
|
|
||||||
|Property|Description|Default|
|
|Property|Description|Default|
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1|
|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|
||||||
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|
|`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000|
|
||||||
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
|
|`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000|
|
||||||
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
|
|`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false|
|
||||||
|
@ -177,17 +177,12 @@ When using the "v2" strategy, the following runtime properties apply:
|
||||||
|
|
||||||
|Property|Description|Default|
|
|Property|Description|Default|
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1|
|
|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2|
|
||||||
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|
|`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0|
|
||||||
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|
|`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0|
|
||||||
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000|
|
||||||
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|
||||||
|
|
||||||
Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that
|
|
||||||
does so. By default, Druid is configured without any merging buffer pool, so to use the "v2" strategy you must also
|
|
||||||
set `druid.processing.numMergeBuffers` to some non-zero number. Furthermore, if you want to execute deeply nested groupBys,
|
|
||||||
you must set `druid.processing.numMergeBuffers` to at least 2.
|
|
||||||
|
|
||||||
This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
|
This may require allocating more direct memory. The amount of direct memory needed by Druid is at least
|
||||||
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
|
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can
|
||||||
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
|
ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=<VALUE>` at the command
|
||||||
|
|
|
@ -49,7 +49,7 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
|
||||||
@Config("${base_path}.numMergeBuffers")
|
@Config("${base_path}.numMergeBuffers")
|
||||||
public int getNumMergeBuffers()
|
public int getNumMergeBuffers()
|
||||||
{
|
{
|
||||||
return 0;
|
return Math.max(2, getNumThreads() / 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Config(value = "${base_path}.columnCache.sizeBytes")
|
@Config(value = "${base_path}.columnCache.sizeBytes")
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class GroupByQueryConfig
|
||||||
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
|
private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize";
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V1;
|
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
private boolean singleThreaded = false;
|
private boolean singleThreaded = false;
|
||||||
|
|
|
@ -47,6 +47,7 @@ public class DruidProcessingConfigTest
|
||||||
} else {
|
} else {
|
||||||
Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors());
|
Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors());
|
||||||
}
|
}
|
||||||
|
Assert.assertEquals(Math.max(2, config.getNumThreads() / 4), config.getNumMergeBuffers());
|
||||||
Assert.assertEquals(0, config.columnCacheSizeBytes());
|
Assert.assertEquals(0, config.columnCacheSizeBytes());
|
||||||
Assert.assertFalse(config.isFifo());
|
Assert.assertFalse(config.isFifo());
|
||||||
Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir());
|
Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir());
|
||||||
|
@ -55,7 +56,7 @@ public class DruidProcessingConfigTest
|
||||||
Properties props = new Properties();
|
Properties props = new Properties();
|
||||||
props.setProperty("druid.processing.buffer.sizeBytes", "1");
|
props.setProperty("druid.processing.buffer.sizeBytes", "1");
|
||||||
props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1");
|
props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1");
|
||||||
props.setProperty("druid.processing.numThreads", "5");
|
props.setProperty("druid.processing.numThreads", "256");
|
||||||
props.setProperty("druid.processing.columnCache.sizeBytes", "1");
|
props.setProperty("druid.processing.columnCache.sizeBytes", "1");
|
||||||
props.setProperty("druid.processing.fifo", "true");
|
props.setProperty("druid.processing.fifo", "true");
|
||||||
props.setProperty("druid.processing.tmpDir", "/test/path");
|
props.setProperty("druid.processing.tmpDir", "/test/path");
|
||||||
|
@ -65,7 +66,8 @@ public class DruidProcessingConfigTest
|
||||||
|
|
||||||
Assert.assertEquals(1, config.intermediateComputeSizeBytes());
|
Assert.assertEquals(1, config.intermediateComputeSizeBytes());
|
||||||
Assert.assertEquals(1, config.poolCacheMaxCount());
|
Assert.assertEquals(1, config.poolCacheMaxCount());
|
||||||
Assert.assertEquals(5, config.getNumThreads());
|
Assert.assertEquals(256, config.getNumThreads());
|
||||||
|
Assert.assertEquals(64, config.getNumMergeBuffers());
|
||||||
Assert.assertEquals(1, config.columnCacheSizeBytes());
|
Assert.assertEquals(1, config.columnCacheSizeBytes());
|
||||||
Assert.assertTrue(config.isFifo());
|
Assert.assertTrue(config.isFifo());
|
||||||
Assert.assertEquals("/test/path", config.getTmpDir());
|
Assert.assertEquals("/test/path", config.getTmpDir());
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.query.groupby;
|
package io.druid.query.groupby;
|
||||||
|
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
@ -28,8 +29,10 @@ import io.druid.data.input.impl.DimensionsSpec;
|
||||||
import io.druid.data.input.impl.StringInputRowParser;
|
import io.druid.data.input.impl.StringInputRowParser;
|
||||||
import io.druid.data.input.impl.TimestampSpec;
|
import io.druid.data.input.impl.TimestampSpec;
|
||||||
import io.druid.granularity.QueryGranularities;
|
import io.druid.granularity.QueryGranularities;
|
||||||
|
import io.druid.java.util.common.guava.MergeSequence;
|
||||||
import io.druid.java.util.common.guava.Sequence;
|
import io.druid.java.util.common.guava.Sequence;
|
||||||
import io.druid.java.util.common.guava.Sequences;
|
import io.druid.java.util.common.guava.Sequences;
|
||||||
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.QueryRunnerFactory;
|
import io.druid.query.QueryRunnerFactory;
|
||||||
import io.druid.query.aggregation.AggregatorFactory;
|
import io.druid.query.aggregation.AggregatorFactory;
|
||||||
|
@ -49,7 +52,7 @@ import org.junit.Test;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -61,15 +64,6 @@ public class GroupByQueryRunnerFactoryTest
|
||||||
@Test
|
@Test
|
||||||
public void testMergeRunnersEnsureGroupMerging() throws Exception
|
public void testMergeRunnersEnsureGroupMerging() throws Exception
|
||||||
{
|
{
|
||||||
QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig());
|
|
||||||
QueryRunner mergedRunner = factory.mergeRunners(
|
|
||||||
Executors.newSingleThreadExecutor(),
|
|
||||||
ImmutableList.of(
|
|
||||||
factory.createRunner(createSegment()),
|
|
||||||
factory.createRunner(createSegment())
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
GroupByQuery query = GroupByQuery
|
GroupByQuery query = GroupByQuery
|
||||||
.builder()
|
.builder()
|
||||||
.setDataSource("xx")
|
.setDataSource("xx")
|
||||||
|
@ -86,6 +80,41 @@ public class GroupByQueryRunnerFactoryTest
|
||||||
)
|
)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
final QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig());
|
||||||
|
|
||||||
|
QueryRunner mergedRunner = factory.getToolchest().mergeResults(
|
||||||
|
new QueryRunner()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Sequence run(Query query, Map responseContext)
|
||||||
|
{
|
||||||
|
return factory.getToolchest().mergeResults(
|
||||||
|
new QueryRunner()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Sequence run(Query query, Map responseContext)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
return new MergeSequence(
|
||||||
|
query.getResultOrdering(),
|
||||||
|
Sequences.simple(
|
||||||
|
Arrays.asList(
|
||||||
|
factory.createRunner(createSegment()).run(query, responseContext),
|
||||||
|
factory.createRunner(createSegment()).run(query, responseContext)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Throwables.propagate(e);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
).run(query, responseContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
Sequence<Row> result = mergedRunner.run(query, Maps.newHashMap());
|
Sequence<Row> result = mergedRunner.run(query, Maps.newHashMap());
|
||||||
|
|
||||||
List<Row> expectedResults = Arrays.asList(
|
List<Row> expectedResults = Arrays.asList(
|
||||||
|
|
|
@ -20,23 +20,35 @@
|
||||||
package io.druid.query.groupby;
|
package io.druid.query.groupby;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import io.druid.data.input.MapBasedRow;
|
import io.druid.data.input.MapBasedRow;
|
||||||
import io.druid.data.input.Row;
|
import io.druid.data.input.Row;
|
||||||
|
import io.druid.granularity.QueryGranularities;
|
||||||
import io.druid.java.util.common.guava.Sequence;
|
import io.druid.java.util.common.guava.Sequence;
|
||||||
import io.druid.java.util.common.guava.Sequences;
|
import io.druid.java.util.common.guava.Sequences;
|
||||||
|
import io.druid.query.Druids;
|
||||||
|
import io.druid.query.FinalizeResultsQueryRunner;
|
||||||
import io.druid.query.Query;
|
import io.druid.query.Query;
|
||||||
import io.druid.query.QueryRunner;
|
import io.druid.query.QueryRunner;
|
||||||
import io.druid.query.QueryRunnerTestHelper;
|
import io.druid.query.QueryRunnerTestHelper;
|
||||||
|
import io.druid.query.QueryToolChest;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
|
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
|
||||||
|
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
|
||||||
import io.druid.query.timeseries.TimeseriesQuery;
|
import io.druid.query.timeseries.TimeseriesQuery;
|
||||||
import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
|
import io.druid.query.timeseries.TimeseriesQueryRunnerTest;
|
||||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||||
|
import org.joda.time.DateTime;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -67,9 +79,18 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
||||||
public Sequence run(Query query, Map responseContext)
|
public Sequence run(Query query, Map responseContext)
|
||||||
{
|
{
|
||||||
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
|
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
|
||||||
|
QueryRunner<Row> newRunner = factory.mergeRunners(
|
||||||
|
MoreExecutors.sameThreadExecutor(), ImmutableList.<QueryRunner<Row>>of(input)
|
||||||
|
);
|
||||||
|
QueryToolChest toolChest = factory.getToolchest();
|
||||||
|
|
||||||
|
newRunner = new FinalizeResultsQueryRunner<>(
|
||||||
|
toolChest.mergeResults(toolChest.preMergeQueryDecoration(newRunner)),
|
||||||
|
toolChest
|
||||||
|
);
|
||||||
|
|
||||||
return Sequences.map(
|
return Sequences.map(
|
||||||
input.run(
|
newRunner.run(
|
||||||
GroupByQuery.builder()
|
GroupByQuery.builder()
|
||||||
.setDataSource(tsQuery.getDataSource())
|
.setDataSource(tsQuery.getDataSource())
|
||||||
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
|
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
|
||||||
|
@ -112,6 +133,45 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
|
||||||
super(runner, false);
|
super(runner, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GroupBy handles timestamps differently when granularity is ALL
|
||||||
|
@Test
|
||||||
|
public void testFullOnTimeseriesMaxMin()
|
||||||
|
{
|
||||||
|
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||||
|
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||||
|
.granularity(QueryGranularities.ALL)
|
||||||
|
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||||
|
.aggregators(
|
||||||
|
Arrays.asList(
|
||||||
|
new DoubleMaxAggregatorFactory("maxIndex", "index"),
|
||||||
|
new DoubleMinAggregatorFactory("minIndex", "index")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.descending(descending)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
DateTime expectedEarliest = new DateTime("1970-01-01");
|
||||||
|
DateTime expectedLast = new DateTime("2011-04-15");
|
||||||
|
|
||||||
|
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||||
|
runner.run(query, CONTEXT),
|
||||||
|
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||||
|
);
|
||||||
|
Result<TimeseriesResultValue> result = results.iterator().next();
|
||||||
|
|
||||||
|
Assert.assertEquals(expectedEarliest, result.getTimestamp());
|
||||||
|
Assert.assertFalse(
|
||||||
|
String.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast),
|
||||||
|
result.getTimestamp().isAfter(expectedLast)
|
||||||
|
);
|
||||||
|
|
||||||
|
final TimeseriesResultValue value = result.getValue();
|
||||||
|
|
||||||
|
Assert.assertEquals(result.toString(), 1870.06103515625, value.getDoubleMetric("maxIndex"), 0.0);
|
||||||
|
Assert.assertEquals(result.toString(), 59.02102279663086, value.getDoubleMetric("minIndex"), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void testEmptyTimeseries()
|
public void testEmptyTimeseries()
|
||||||
{
|
{
|
||||||
|
|
|
@ -102,8 +102,8 @@ public class TimeseriesQueryRunnerTest
|
||||||
TestHelper.assertExpectedResults(expectedResults, results);
|
TestHelper.assertExpectedResults(expectedResults, results);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final QueryRunner runner;
|
protected final QueryRunner runner;
|
||||||
private final boolean descending;
|
protected final boolean descending;
|
||||||
|
|
||||||
public TimeseriesQueryRunnerTest(
|
public TimeseriesQueryRunnerTest(
|
||||||
QueryRunner runner, boolean descending
|
QueryRunner runner, boolean descending
|
||||||
|
|
|
@ -27,6 +27,7 @@ import io.druid.jackson.DefaultObjectMapper;
|
||||||
import io.druid.java.util.common.guava.Sequence;
|
import io.druid.java.util.common.guava.Sequence;
|
||||||
import io.druid.java.util.common.guava.Sequences;
|
import io.druid.java.util.common.guava.Sequences;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
|
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||||
import io.druid.segment.column.ColumnConfig;
|
import io.druid.segment.column.ColumnConfig;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
|
@ -136,6 +137,17 @@ public class TestHelper
|
||||||
// HACK! Special casing for groupBy
|
// HACK! Special casing for groupBy
|
||||||
assertRow(failMsg, (Row) expectedNext, (Row) next);
|
assertRow(failMsg, (Row) expectedNext, (Row) next);
|
||||||
assertRow(failMsg, (Row) expectedNext, (Row) next2);
|
assertRow(failMsg, (Row) expectedNext, (Row) next2);
|
||||||
|
} else if (expectedNext instanceof Result
|
||||||
|
&& (((Result) expectedNext).getValue()) instanceof TimeseriesResultValue) {
|
||||||
|
// Special case for GroupByTimeseriesQueryRunnerTest to allow a floating point delta to be used
|
||||||
|
// in result comparison
|
||||||
|
assertTimeseriesResultValue(failMsg, (Result) expectedNext, (Result) next);
|
||||||
|
assertTimeseriesResultValue(
|
||||||
|
String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg),
|
||||||
|
(Result) expectedNext,
|
||||||
|
(Result) next2
|
||||||
|
);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
assertResult(failMsg, (Result) expectedNext, (Result) next);
|
assertResult(failMsg, (Result) expectedNext, (Result) next);
|
||||||
assertResult(
|
assertResult(
|
||||||
|
@ -222,6 +234,40 @@ public class TestHelper
|
||||||
Assert.assertEquals(msg, expected, actual);
|
Assert.assertEquals(msg, expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void assertTimeseriesResultValue(String msg, Result expected, Result actual)
|
||||||
|
{
|
||||||
|
// Custom equals check to get fuzzy comparison of numerics, useful because different groupBy strategies don't
|
||||||
|
// always generate exactly the same results (different merge ordering / float vs double)
|
||||||
|
Assert.assertEquals(String.format("%s: timestamp", msg), expected.getTimestamp(), actual.getTimestamp());
|
||||||
|
|
||||||
|
TimeseriesResultValue expectedVal = (TimeseriesResultValue) expected.getValue();
|
||||||
|
TimeseriesResultValue actualVal = (TimeseriesResultValue) actual.getValue();
|
||||||
|
|
||||||
|
final Map<String, Object> expectedMap = (Map<String, Object>) expectedVal.getBaseObject();
|
||||||
|
final Map<String, Object> actualMap = (Map<String, Object>) actualVal.getBaseObject();
|
||||||
|
|
||||||
|
Assert.assertEquals(String.format("%s: map keys", msg), expectedMap.keySet(), actualMap.keySet());
|
||||||
|
for (final String key : expectedMap.keySet()) {
|
||||||
|
final Object expectedValue = expectedMap.get(key);
|
||||||
|
final Object actualValue = actualMap.get(key);
|
||||||
|
|
||||||
|
if (expectedValue instanceof Float || expectedValue instanceof Double) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
String.format("%s: key[%s]", msg, key),
|
||||||
|
((Number) expectedValue).doubleValue(),
|
||||||
|
((Number) actualValue).doubleValue(),
|
||||||
|
((Number) expectedValue).doubleValue() * 1e-6
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
Assert.assertEquals(
|
||||||
|
String.format("%s: key[%s]", msg, key),
|
||||||
|
expectedValue,
|
||||||
|
actualValue
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void assertRow(String msg, Row expected, Row actual)
|
private static void assertRow(String msg, Row expected, Row actual)
|
||||||
{
|
{
|
||||||
// Custom equals check to get fuzzy comparison of numerics, useful because different groupBy strategies don't
|
// Custom equals check to get fuzzy comparison of numerics, useful because different groupBy strategies don't
|
||||||
|
|
Loading…
Reference in New Issue