From bc33b68b513da2a6a89e3467f7289197d323924e Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Sat, 18 Feb 2017 07:40:40 -0800 Subject: [PATCH] Use GroupBy V2 as default (#3953) * Use GroupBy V2 as default * Remove unused line * Change assert to exception propagation --- docs/content/configuration/broker.md | 2 +- docs/content/configuration/historical.md | 2 +- docs/content/configuration/realtime.md | 2 +- docs/content/querying/groupbyquery.md | 11 +--- .../io/druid/query/DruidProcessingConfig.java | 2 +- .../query/groupby/GroupByQueryConfig.java | 2 +- .../query/DruidProcessingConfigTest.java | 6 +- .../GroupByQueryRunnerFactoryTest.java | 49 ++++++++++++--- .../GroupByTimeseriesQueryRunnerTest.java | 62 ++++++++++++++++++- .../timeseries/TimeseriesQueryRunnerTest.java | 4 +- .../java/io/druid/segment/TestHelper.java | 46 ++++++++++++++ 11 files changed, 160 insertions(+), 28 deletions(-) diff --git a/docs/content/configuration/broker.md b/docs/content/configuration/broker.md index aa8f62656bf..52d7a834386 100644 --- a/docs/content/configuration/broker.md +++ b/docs/content/configuration/broker.md @@ -57,7 +57,7 @@ The broker uses processing configs for nested groupBy queries. And, optionally, |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| |`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| -|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0| +|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 9d539b50695..7ad71ae9ea0 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -56,7 +56,7 @@ Druid uses Jetty to serve HTTP requests. |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| |`druid.processing.buffer.poolCacheMaxCount`|processing buffer pool caches the buffers for later use, this is the maximum count cache will grow to. note that pool can create more buffers than it can cache if necessary.|Integer.MAX_VALUE| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| -|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0| +|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`| diff --git a/docs/content/configuration/realtime.md b/docs/content/configuration/realtime.md index 28475bae5fa..ca634e6072d 100644 --- a/docs/content/configuration/realtime.md +++ b/docs/content/configuration/realtime.md @@ -41,7 +41,7 @@ The realtime node uses several of the global configs in [Configuration](../confi |--------|-----------|-------| |`druid.processing.buffer.sizeBytes`|This specifies a buffer size for the storage of intermediate results. The computation engine in both the Historical and Realtime nodes will use a scratch buffer of this size to do all of their intermediate computations off-heap. Larger values allow for more aggregations in a single pass over the data while smaller values can require more passes depending on the query that is being executed.|1073741824 (1GB)| |`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s| -|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. By default, no queries use these buffers, so the default pool size is zero.|0| +|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`| |`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)| |`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)| |`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`| diff --git a/docs/content/querying/groupbyquery.md b/docs/content/querying/groupbyquery.md index d455c27e390..fceaf2dbd87 100644 --- a/docs/content/querying/groupbyquery.md +++ b/docs/content/querying/groupbyquery.md @@ -119,7 +119,7 @@ See [Multi-value dimensions](multi-value-dimensions.html) for more details. GroupBy queries can be executed using two different strategies. The default strategy for a cluster is determined by the "druid.query.groupBy.defaultStrategy" runtime property on the broker. This can be overridden using "groupByStrategy" in -the query context. If neither the context field nor the property is set, the "v1" strategy will be used. +the query context. If neither the context field nor the property is set, the "v2" strategy will be used. - "v1", the default, generates per-segment results on data nodes (historical, realtime, middleManager) using a map which is partially on-heap (dimension keys and the map itself) and partially off-heap (the aggregated values). Data nodes then @@ -168,7 +168,7 @@ When using the "v1" strategy, the following runtime properties apply: |Property|Description|Default| |--------|-----------|-------| -|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1| +|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2| |`druid.query.groupBy.maxIntermediateRows`|Maximum number of intermediate rows for the per-segment grouping engine. This is a tuning parameter that does not impose a hard limit; rather, it potentially shifts merging work from the per-segment engine to the overall merging index. Queries that exceed this limit will not fail.|50000| |`druid.query.groupBy.maxResults`|Maximum number of results. Queries that exceed this limit will fail.|500000| |`druid.query.groupBy.singleThreaded`|Merge results using a single thread.|false| @@ -177,17 +177,12 @@ When using the "v2" strategy, the following runtime properties apply: |Property|Description|Default| |--------|-----------|-------| -|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v1| +|`druid.query.groupBy.defaultStrategy`|Default groupBy query strategy.|v2| |`druid.query.groupBy.bufferGrouperInitialBuckets`|Initial number of buckets in the off-heap hash table used for grouping results. Set to 0 to use a reasonable default.|0| |`druid.query.groupBy.bufferGrouperMaxLoadFactor`|Maximum load factor of the off-heap hash table used for grouping results. When the load factor exceeds this size, the table will be grown or spilled to disk. Set to 0 to use a reasonable default.|0| |`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for the string dictionary during merging. When the dictionary exceeds this size, a spill to disk will be triggered.|100000000| |`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)| -Additionally, the "v2" strategy uses merging buffers for merging. It is currently the only query implementation that -does so. By default, Druid is configured without any merging buffer pool, so to use the "v2" strategy you must also -set `druid.processing.numMergeBuffers` to some non-zero number. Furthermore, if you want to execute deeply nested groupBys, -you must set `druid.processing.numMergeBuffers` to at least 2. - This may require allocating more direct memory. The amount of direct memory needed by Druid is at least `druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can ensure at least this amount of direct memory is available by providing `-XX:MaxDirectMemorySize=` at the command diff --git a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java index ee7520131be..94d349fd3fb 100644 --- a/processing/src/main/java/io/druid/query/DruidProcessingConfig.java +++ b/processing/src/main/java/io/druid/query/DruidProcessingConfig.java @@ -49,7 +49,7 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem @Config("${base_path}.numMergeBuffers") public int getNumMergeBuffers() { - return 0; + return Math.max(2, getNumThreads() / 4); } @Config(value = "${base_path}.columnCache.sizeBytes") diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java index 6f3c3379440..70090210ae6 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryConfig.java @@ -37,7 +37,7 @@ public class GroupByQueryConfig private static final String CTX_KEY_MAX_MERGING_DICTIONARY_SIZE = "maxMergingDictionarySize"; @JsonProperty - private String defaultStrategy = GroupByStrategySelector.STRATEGY_V1; + private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2; @JsonProperty private boolean singleThreaded = false; diff --git a/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java index 3f4c8206f58..27ddabfecad 100644 --- a/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java +++ b/processing/src/test/java/io/druid/query/DruidProcessingConfigTest.java @@ -47,6 +47,7 @@ public class DruidProcessingConfigTest } else { Assert.assertTrue(config.getNumThreads() < Runtime.getRuntime().availableProcessors()); } + Assert.assertEquals(Math.max(2, config.getNumThreads() / 4), config.getNumMergeBuffers()); Assert.assertEquals(0, config.columnCacheSizeBytes()); Assert.assertFalse(config.isFifo()); Assert.assertEquals(System.getProperty("java.io.tmpdir"), config.getTmpDir()); @@ -55,7 +56,7 @@ public class DruidProcessingConfigTest Properties props = new Properties(); props.setProperty("druid.processing.buffer.sizeBytes", "1"); props.setProperty("druid.processing.buffer.poolCacheMaxCount", "1"); - props.setProperty("druid.processing.numThreads", "5"); + props.setProperty("druid.processing.numThreads", "256"); props.setProperty("druid.processing.columnCache.sizeBytes", "1"); props.setProperty("druid.processing.fifo", "true"); props.setProperty("druid.processing.tmpDir", "/test/path"); @@ -65,7 +66,8 @@ public class DruidProcessingConfigTest Assert.assertEquals(1, config.intermediateComputeSizeBytes()); Assert.assertEquals(1, config.poolCacheMaxCount()); - Assert.assertEquals(5, config.getNumThreads()); + Assert.assertEquals(256, config.getNumThreads()); + Assert.assertEquals(64, config.getNumMergeBuffers()); Assert.assertEquals(1, config.columnCacheSizeBytes()); Assert.assertTrue(config.isFifo()); Assert.assertEquals("/test/path", config.getTmpDir()); diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index e5b03d62a0d..a1e50b8701c 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -19,6 +19,7 @@ package io.druid.query.groupby; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -28,8 +29,10 @@ import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.granularity.QueryGranularities; +import io.druid.java.util.common.guava.MergeSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactory; import io.druid.query.aggregation.AggregatorFactory; @@ -49,7 +52,7 @@ import org.junit.Test; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Executors; +import java.util.Map; /** */ @@ -61,15 +64,6 @@ public class GroupByQueryRunnerFactoryTest @Test public void testMergeRunnersEnsureGroupMerging() throws Exception { - QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()); - QueryRunner mergedRunner = factory.mergeRunners( - Executors.newSingleThreadExecutor(), - ImmutableList.of( - factory.createRunner(createSegment()), - factory.createRunner(createSegment()) - ) - ); - GroupByQuery query = GroupByQuery .builder() .setDataSource("xx") @@ -86,6 +80,41 @@ public class GroupByQueryRunnerFactoryTest ) .build(); + final QueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(new GroupByQueryConfig()); + + QueryRunner mergedRunner = factory.getToolchest().mergeResults( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + return factory.getToolchest().mergeResults( + new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + try { + return new MergeSequence( + query.getResultOrdering(), + Sequences.simple( + Arrays.asList( + factory.createRunner(createSegment()).run(query, responseContext), + factory.createRunner(createSegment()).run(query, responseContext) + ) + ) + ); + } catch (Exception e) { + Throwables.propagate(e); + return null; + } + } + } + ).run(query, responseContext); + } + } + ); + Sequence result = mergedRunner.run(query, Maps.newHashMap()); List expectedResults = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 1b74bbb5332..347c0d6a515 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -20,23 +20,35 @@ package io.druid.query.groupby; import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.MoreExecutors; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; +import io.druid.granularity.QueryGranularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.Druids; +import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QueryToolChest; import io.druid.query.Result; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleMinAggregatorFactory; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryRunnerTest; import io.druid.query.timeseries.TimeseriesResultValue; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.IOException; +import java.util.Arrays; import java.util.Map; /** @@ -67,9 +79,18 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest public Sequence run(Query query, Map responseContext) { TimeseriesQuery tsQuery = (TimeseriesQuery) query; + QueryRunner newRunner = factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), ImmutableList.>of(input) + ); + QueryToolChest toolChest = factory.getToolchest(); + + newRunner = new FinalizeResultsQueryRunner<>( + toolChest.mergeResults(toolChest.preMergeQueryDecoration(newRunner)), + toolChest + ); return Sequences.map( - input.run( + newRunner.run( GroupByQuery.builder() .setDataSource(tsQuery.getDataSource()) .setQuerySegmentSpec(tsQuery.getQuerySegmentSpec()) @@ -112,6 +133,45 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest super(runner, false); } + // GroupBy handles timestamps differently when granularity is ALL + @Test + public void testFullOnTimeseriesMaxMin() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryGranularities.ALL) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .aggregators( + Arrays.asList( + new DoubleMaxAggregatorFactory("maxIndex", "index"), + new DoubleMinAggregatorFactory("minIndex", "index") + ) + ) + .descending(descending) + .build(); + + DateTime expectedEarliest = new DateTime("1970-01-01"); + DateTime expectedLast = new DateTime("2011-04-15"); + + Iterable> results = Sequences.toList( + runner.run(query, CONTEXT), + Lists.>newArrayList() + ); + Result result = results.iterator().next(); + + Assert.assertEquals(expectedEarliest, result.getTimestamp()); + Assert.assertFalse( + String.format("Timestamp[%s] > expectedLast[%s]", result.getTimestamp(), expectedLast), + result.getTimestamp().isAfter(expectedLast) + ); + + final TimeseriesResultValue value = result.getValue(); + + Assert.assertEquals(result.toString(), 1870.06103515625, value.getDoubleMetric("maxIndex"), 0.0); + Assert.assertEquals(result.toString(), 59.02102279663086, value.getDoubleMetric("minIndex"), 0.0); + } + + @Override public void testEmptyTimeseries() { diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index 3f3327e7a7a..2a86f8a85ca 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -102,8 +102,8 @@ public class TimeseriesQueryRunnerTest TestHelper.assertExpectedResults(expectedResults, results); } - private final QueryRunner runner; - private final boolean descending; + protected final QueryRunner runner; + protected final boolean descending; public TimeseriesQueryRunnerTest( QueryRunner runner, boolean descending diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index f08045f732c..cc449afb893 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -27,6 +27,7 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Result; +import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.column.ColumnConfig; import org.junit.Assert; @@ -136,6 +137,17 @@ public class TestHelper // HACK! Special casing for groupBy assertRow(failMsg, (Row) expectedNext, (Row) next); assertRow(failMsg, (Row) expectedNext, (Row) next2); + } else if (expectedNext instanceof Result + && (((Result) expectedNext).getValue()) instanceof TimeseriesResultValue) { + // Special case for GroupByTimeseriesQueryRunnerTest to allow a floating point delta to be used + // in result comparison + assertTimeseriesResultValue(failMsg, (Result) expectedNext, (Result) next); + assertTimeseriesResultValue( + String.format("%s: Second iterator bad, multiple calls to iterator() should be safe", failMsg), + (Result) expectedNext, + (Result) next2 + ); + } else { assertResult(failMsg, (Result) expectedNext, (Result) next); assertResult( @@ -222,6 +234,40 @@ public class TestHelper Assert.assertEquals(msg, expected, actual); } + private static void assertTimeseriesResultValue(String msg, Result expected, Result actual) + { + // Custom equals check to get fuzzy comparison of numerics, useful because different groupBy strategies don't + // always generate exactly the same results (different merge ordering / float vs double) + Assert.assertEquals(String.format("%s: timestamp", msg), expected.getTimestamp(), actual.getTimestamp()); + + TimeseriesResultValue expectedVal = (TimeseriesResultValue) expected.getValue(); + TimeseriesResultValue actualVal = (TimeseriesResultValue) actual.getValue(); + + final Map expectedMap = (Map) expectedVal.getBaseObject(); + final Map actualMap = (Map) actualVal.getBaseObject(); + + Assert.assertEquals(String.format("%s: map keys", msg), expectedMap.keySet(), actualMap.keySet()); + for (final String key : expectedMap.keySet()) { + final Object expectedValue = expectedMap.get(key); + final Object actualValue = actualMap.get(key); + + if (expectedValue instanceof Float || expectedValue instanceof Double) { + Assert.assertEquals( + String.format("%s: key[%s]", msg, key), + ((Number) expectedValue).doubleValue(), + ((Number) actualValue).doubleValue(), + ((Number) expectedValue).doubleValue() * 1e-6 + ); + } else { + Assert.assertEquals( + String.format("%s: key[%s]", msg, key), + expectedValue, + actualValue + ); + } + } + } + private static void assertRow(String msg, Row expected, Row actual) { // Custom equals check to get fuzzy comparison of numerics, useful because different groupBy strategies don't