Timeseries: Add "grandTotal" option. (#5640)

* Timeseries: Add "grandTotal" option.

* Modify whitespace.

* Checkstyle workaround.
This commit is contained in:
Gian Merlino 2018-04-16 18:22:19 -07:00 committed by Slim Bouguerra
parent d0b66a6af5
commit fbf3fc178e
5 changed files with 215 additions and 17 deletions

View File

@ -56,7 +56,7 @@ There are 7 main parts to a timeseries query:
|filter|See [Filters](../querying/filters.html)|no|
|aggregations|See [Aggregations](../querying/aggregations.html)|no|
|postAggregations|See [Post Aggregations](../querying/post-aggregations.html)|no|
|context|See [Context](../querying/query-context.html)|no|
|context|Can be used to modify query behavior, including [grand totals](#grand-totals) and [zero-filling](#zero-filling). See also [Context](../querying/query-context.html) for parameters that apply to all query types.|no|
To pull it all together, the above query would return 2 data points, one for each day between 2012-01-01 and 2012-01-03, from the "sample\_datasource" table. Each data point would be the (long) sum of sample\_fieldName1, the (double) sum of sample\_fieldName2 and the (double) result of sample\_fieldName1 divided by sample\_fieldName2 for the filter set. The output looks like this:
@ -73,6 +73,31 @@ To pull it all together, the above query would return 2 data points, one for eac
]
```
#### Grand totals
Druid can include an extra "grand totals" row as the last row of a timeseries result set. To enable this, add
`"grandTotal" : true` to your query context. For example:
```json
{
"queryType": "timeseries",
"dataSource": "sample_datasource",
"intervals": [ "2012-01-01T00:00:00.000/2012-01-03T00:00:00.000" ],
"granularity": "day",
"aggregations": [
{ "type": "longSum", "name": "sample_name1", "fieldName": "sample_fieldName1" },
{ "type": "doubleSum", "name": "sample_name2", "fieldName": "sample_fieldName2" }
],
"context": {
"grandTotal": true
}
}
```
The grand totals row will appear as the last row in the result array, and will have no timestamp. It will be the last
row even if the query is run in "descending" mode. Post-aggregations in the grand totals row will be computed based
upon the grand total aggregations.
#### Zero-filling
Timeseries queries normally fill empty interior time buckets with zeroes. For example, if you issue a "day" granularity

View File

@ -45,6 +45,8 @@ import java.util.Objects;
@JsonTypeName("timeseries")
public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
{
static final String CTX_GRAND_TOTAL = "grandTotal";
private final VirtualColumns virtualColumns;
private final DimFilter dimFilter;
private final List<AggregatorFactory> aggregatorSpecs;
@ -117,6 +119,11 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
return postAggregatorSpecs;
}
public boolean isGrandTotal()
{
return getContextBoolean(CTX_GRAND_TOTAL, false);
}
public boolean isSkipEmptyBuckets()
{
return getContextBoolean("skipEmptyBuckets", false);
@ -155,16 +162,16 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public String toString()
{
return "TimeseriesQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", virtualColumns=" + virtualColumns +
", dimFilter=" + dimFilter +
", granularity='" + getGranularity() + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", descending=" + isDescending() +
", virtualColumns=" + virtualColumns +
", dimFilter=" + dimFilter +
", granularity='" + getGranularity() + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
", context=" + getContext() +
'}';
}
@Override
@ -181,9 +188,9 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
}
final TimeseriesQuery that = (TimeseriesQuery) o;
return Objects.equals(virtualColumns, that.virtualColumns) &&
Objects.equals(dimFilter, that.dimFilter) &&
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
Objects.equals(dimFilter, that.dimFilter) &&
Objects.equals(aggregatorSpecs, that.aggregatorSpecs) &&
Objects.equals(postAggregatorSpecs, that.postAggregatorSpecs);
}
@Override

View File

@ -23,12 +23,14 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.guava.nary.BinaryFn;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
@ -46,6 +48,8 @@ import io.druid.query.cache.CacheKeyBuilder;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -88,7 +92,8 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
QueryRunner<Result<TimeseriesResultValue>> queryRunner
)
{
return new ResultMergeQueryRunner<Result<TimeseriesResultValue>>(queryRunner)
final QueryRunner<Result<TimeseriesResultValue>> resultMergeQueryRunner = new ResultMergeQueryRunner<Result<TimeseriesResultValue>>(
queryRunner)
{
@Override
public Sequence<Result<TimeseriesResultValue>> doRun(
@ -125,6 +130,71 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
);
}
};
return new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
final QueryPlus<Result<TimeseriesResultValue>> queryPlus,
final Map<String, Object> responseContext
)
{
final TimeseriesQuery query = (TimeseriesQuery) queryPlus.getQuery();
final Sequence<Result<TimeseriesResultValue>> baseResults = resultMergeQueryRunner.run(
queryPlus.withQuery(
queryPlus.getQuery()
.withOverriddenContext(
ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)
)
),
responseContext
);
if (query.isGrandTotal()) {
// Accumulate grand totals while iterating the sequence.
final Object[] grandTotals = new Object[query.getAggregatorSpecs().size()];
final Sequence<Result<TimeseriesResultValue>> mappedSequence = Sequences.map(
baseResults,
resultValue -> {
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
final AggregatorFactory aggregatorFactory = query.getAggregatorSpecs().get(i);
final Object value = resultValue.getValue().getMetric(aggregatorFactory.getName());
if (grandTotals[i] == null) {
grandTotals[i] = value;
} else {
grandTotals[i] = aggregatorFactory.combine(grandTotals[i], value);
}
}
return resultValue;
}
);
return Sequences.concat(
ImmutableList.of(
mappedSequence,
Sequences.simple(
() -> {
final Map<String, Object> totalsMap = new HashMap<>();
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
totalsMap.put(query.getAggregatorSpecs().get(i).getName(), grandTotals[i]);
}
final Result<TimeseriesResultValue> result = new Result<>(
null,
new TimeseriesResultValue(totalsMap)
);
return Collections.singletonList(result).iterator();
}
)
)
);
} else {
return baseResults;
}
}
};
}
@Override

View File

@ -70,6 +70,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -389,6 +390,101 @@ public class TimeseriesQueryRunnerTest
assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesGrandTotal()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.dayGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum,
QueryRunnerTestHelper.qualityUniques
)
)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.descending(descending)
.context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, true))
.build();
List<Result<TimeseriesResultValue>> expectedResults = new ArrayList<>();
expectedResults.add(
new Result<>(
DateTimes.of("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.of(
"rows",
13L,
"index",
6619L,
"uniques",
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.addRowsIndexConstantMetric,
6633.0
)
)
)
);
expectedResults.add(
new Result<>(
DateTimes.of("2011-04-02"),
new TimeseriesResultValue(
ImmutableMap.of(
"rows",
13L,
"index",
5827L,
"uniques",
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.addRowsIndexConstantMetric,
5841.0
)
)
)
);
if (descending) {
Collections.reverse(expectedResults);
}
expectedResults.add(
new Result<>(
null,
new TimeseriesResultValue(
ImmutableMap.of(
"rows",
26L,
"index",
12446L,
"uniques",
QueryRunnerTestHelper.UNIQUES_9,
QueryRunnerTestHelper.addRowsIndexConstantMetric,
12473.0
)
)
)
);
// Must create a toolChest so we can run mergeResults (which applies grand totals).
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
// Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called.
final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner(
toolChest.mergeResults(runner),
toolChest
);
final List results = finalRunner.run(QueryPlus.wrap(query), CONTEXT).toList();
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithVirtualColumn()
{

View File

@ -302,8 +302,8 @@ public class TestHelper
// always generate exactly the same results (different merge ordering / float vs double)
Assert.assertEquals(
StringUtils.format("%s: timestamp", msg),
expected.getTimestamp().getMillis(),
actual.getTimestamp().getMillis()
expected.getTimestamp(),
actual.getTimestamp()
);
final Map<String, Object> expectedMap = ((MapBasedRow) expected).getEvent();