diff --git a/docs/content/querying/timeseriesquery.md b/docs/content/querying/timeseriesquery.md index 08175c7f264..9a42bfa8726 100644 --- a/docs/content/querying/timeseriesquery.md +++ b/docs/content/querying/timeseriesquery.md @@ -56,7 +56,7 @@ There are 7 main parts to a timeseries query: |filter|See [Filters](../querying/filters.html)|no| |aggregations|See [Aggregations](../querying/aggregations.html)|no| |postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no| -|context|See [Context](../querying/query-context.html)|no| +|context|Can be used to modify query behavior, including [grand totals](#grand-totals) and [zero-filling](#zero-filling). See also [Context](../querying/query-context.html) for parameters that apply to all query types.|no| To pull it all together, the above query would return 2 data points, one for each day between 2012-01-01 and 2012-01-03, from the "sample\_datasource" table. Each data point would be the (long) sum of sample\_fieldName1, the (double) sum of sample\_fieldName2 and the (double) result of sample\_fieldName1 divided by sample\_fieldName2 for the filter set. The output looks like this: @@ -73,6 +73,31 @@ To pull it all together, the above query would return 2 data points, one for eac ] ``` +#### Grand totals + +Druid can include an extra "grand totals" row as the last row of a timeseries result set. To enable this, add +`"grandTotal" : true` to your query context. For example: + +```json +{ + "queryType": "timeseries", + "dataSource": "sample_datasource", + "intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ], + "granularity": "day", + "aggregations": [ + { "type": "longSum", "name": "sample_name1", "fieldName": "sample_fieldName1" }, + { "type": "doubleSum", "name": "sample_name2", "fieldName": "sample_fieldName2" } + ], + "context": { + "grandTotal": true + } +} +``` + +The grand totals row will appear as the last row in the result array, and will have no timestamp. It will be the last +row even if the query is run in "descending" mode. Post-aggregations in the grand totals row will be computed based +upon the grand total aggregations. + #### Zero-filling Timeseries queries normally fill empty interior time buckets with zeroes. For example, if you issue a "day" granularity diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java index d1ac95cdadc..9d9dd987279 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQuery.java @@ -45,6 +45,8 @@ import java.util.Objects; @JsonTypeName("timeseries") public class TimeseriesQuery extends BaseQuery> { + static final String CTX_GRAND_TOTAL = "grandTotal"; + private final VirtualColumns virtualColumns; private final DimFilter dimFilter; private final List aggregatorSpecs; @@ -117,6 +119,11 @@ public class TimeseriesQuery extends BaseQuery> return postAggregatorSpecs; } + public boolean isGrandTotal() + { + return getContextBoolean(CTX_GRAND_TOTAL, false); + } + public boolean isSkipEmptyBuckets() { return getContextBoolean("skipEmptyBuckets", false); @@ -155,16 +162,16 @@ public class TimeseriesQuery extends BaseQuery> public String toString() { return "TimeseriesQuery{" + - "dataSource='" + getDataSource() + '\'' + - ", querySegmentSpec=" + getQuerySegmentSpec() + - ", descending=" + isDescending() + - ", virtualColumns=" + virtualColumns + - ", dimFilter=" + dimFilter + - ", granularity='" + getGranularity() + '\'' + - ", aggregatorSpecs=" + aggregatorSpecs + - ", postAggregatorSpecs=" + postAggregatorSpecs + - ", context=" + getContext() + - '}'; + "dataSource='" + getDataSource() + '\'' + + ", querySegmentSpec=" + getQuerySegmentSpec() + + ", descending=" + isDescending() + + ", virtualColumns=" + virtualColumns + + ", dimFilter=" + dimFilter + + ", granularity='" + getGranularity() + '\'' + + ", aggregatorSpecs=" + aggregatorSpecs + + ", postAggregatorSpecs=" + postAggregatorSpecs + + ", context=" + getContext() + + '}'; } @Override @@ -181,9 +188,9 @@ public class TimeseriesQuery extends BaseQuery> } final TimeseriesQuery that = (TimeseriesQuery) o; return Objects.equals(virtualColumns, that.virtualColumns) && - Objects.equals(dimFilter, that.dimFilter) && - Objects.equals(aggregatorSpecs, that.aggregatorSpecs) && - Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs); + Objects.equals(dimFilter, that.dimFilter) && + Objects.equals(aggregatorSpecs, that.aggregatorSpecs) && + Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs); } @Override diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 6e9f0307f70..c50d2dd50f8 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -23,12 +23,14 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.nary.BinaryFn; import io.druid.query.CacheStrategy; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -46,6 +48,8 @@ import io.druid.query.cache.CacheKeyBuilder; import org.joda.time.DateTime; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -88,7 +92,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest> queryRunner ) { - return new ResultMergeQueryRunner>(queryRunner) + final QueryRunner> resultMergeQueryRunner = new ResultMergeQueryRunner>( + queryRunner) { @Override public Sequence> doRun( @@ -125,6 +130,71 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest>() + { + @Override + public Sequence> run( + final QueryPlus> queryPlus, + final Map responseContext + ) + { + final TimeseriesQuery query = (TimeseriesQuery) queryPlus.getQuery(); + final Sequence> baseResults = resultMergeQueryRunner.run( + queryPlus.withQuery( + queryPlus.getQuery() + .withOverriddenContext( + ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false) + ) + ), + responseContext + ); + + if (query.isGrandTotal()) { + // Accumulate grand totals while iterating the sequence. + final Object[] grandTotals = new Object[query.getAggregatorSpecs().size()]; + final Sequence> mappedSequence = Sequences.map( + baseResults, + resultValue -> { + for (int i = 0; i < query.getAggregatorSpecs().size(); i++) { + final AggregatorFactory aggregatorFactory = query.getAggregatorSpecs().get(i); + final Object value = resultValue.getValue().getMetric(aggregatorFactory.getName()); + if (grandTotals[i] == null) { + grandTotals[i] = value; + } else { + grandTotals[i] = aggregatorFactory.combine(grandTotals[i], value); + } + } + return resultValue; + } + ); + + return Sequences.concat( + ImmutableList.of( + mappedSequence, + Sequences.simple( + () -> { + final Map totalsMap = new HashMap<>(); + + for (int i = 0; i < query.getAggregatorSpecs().size(); i++) { + totalsMap.put(query.getAggregatorSpecs().get(i).getName(), grandTotals[i]); + } + + final Result result = new Result<>( + null, + new TimeseriesResultValue(totalsMap) + ); + + return Collections.singletonList(result).iterator(); + } + ) + ) + ); + } else { + return baseResults; + } + } + }; } @Override diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java index b2f19c95750..385a96b3e74 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerTest.java @@ -70,6 +70,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -389,6 +390,101 @@ public class TimeseriesQueryRunnerTest assertExpectedResults(expectedResults, results); } + @Test + public void testTimeseriesGrandTotal() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.dayGran) + .intervals(QueryRunnerTestHelper.firstToThird) + .aggregators( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + QueryRunnerTestHelper.indexLongSum, + QueryRunnerTestHelper.qualityUniques + ) + ) + .postAggregators(QueryRunnerTestHelper.addRowsIndexConstant) + .descending(descending) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true)) + .build(); + + List> expectedResults = new ArrayList<>(); + + expectedResults.add( + new Result<>( + DateTimes.of("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 13L, + "index", + 6619L, + "uniques", + QueryRunnerTestHelper.UNIQUES_9, + QueryRunnerTestHelper.addRowsIndexConstantMetric, + 6633.0 + ) + ) + ) + ); + + expectedResults.add( + new Result<>( + DateTimes.of("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 13L, + "index", + 5827L, + "uniques", + QueryRunnerTestHelper.UNIQUES_9, + QueryRunnerTestHelper.addRowsIndexConstantMetric, + 5841.0 + ) + ) + ) + ); + + if (descending) { + Collections.reverse(expectedResults); + } + + expectedResults.add( + new Result<>( + null, + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 26L, + "index", + 12446L, + "uniques", + QueryRunnerTestHelper.UNIQUES_9, + QueryRunnerTestHelper.addRowsIndexConstantMetric, + 12473.0 + ) + ) + ) + ); + + // Must create a toolChest so we can run mergeResults (which applies grand totals). + QueryToolChest, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ); + + // Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called. + final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner( + toolChest.mergeResults(runner), + toolChest + ); + + final List results = finalRunner.run(QueryPlus.wrap(query), CONTEXT).toList(); + + TestHelper.assertExpectedResults(expectedResults, results); + } + @Test public void testTimeseriesWithVirtualColumn() { diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index 160faee74bb..299f5c060bb 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -302,8 +302,8 @@ public class TestHelper // always generate exactly the same results (different merge ordering / float vs double) Assert.assertEquals( StringUtils.format("%s: timestamp", msg), - expected.getTimestamp().getMillis(), - actual.getTimestamp().getMillis() + expected.getTimestamp(), + actual.getTimestamp() ); final Map expectedMap = ((MapBasedRow) expected).getEvent();