Timeseries results are incoherent for case interval is out of range and case false filter. (#5649)

* adding some tests

Change-Id: I92180498e2e6695212b286d980e349c136c78c86

* added empty sequence runner

Change-Id: I20c83095072bbf3b4a3a57dfc1934d528e2c7a1a

* treat only granularity ALL

Change-Id: I1d88fab500c615bc46db4f4497ce93089976441f

* moving toList within If and add expected queries

Change-Id: I56cdd980e44f0685806efb45e29031fa2e328ec4

* typo

Change-Id: I42fdd28da5471f6ae57d3962f671741b106300cd

* adding tests and fix logic of intervals

Change-Id: I0bd414d2278e3eddc2810e4f5080e6cf6a117f12

* fix style

Change-Id: I99a2380934c9ab350ca934c56041dc343c08b99f

* comments review

Change-Id: I726a3b905a9520d8b1db70e4ba17853c65c414a4
This commit is contained in:
Slim Bouguerra 2018-04-23 15:55:18 -07:00 committed by GitHub
parent 55b003e5e8
commit 73da7426da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 273 additions and 147 deletions

View File

@ -19,7 +19,6 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@ -105,45 +104,40 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<T>, ListenableFuture<Iterable<T>>>()
{
@Override
public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
{
if (input == null) {
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
input -> {
if (input == null) {
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
return exec.submit(
new AbstractPrioritizedCallable<Iterable<T>>(priority)
return exec.submit(
new AbstractPrioritizedCallable<Iterable<T>>(priority)
{
@Override
public Iterable<T> call()
{
@Override
public Iterable<T> call()
{
try {
Sequence<T> result = input.run(threadSafeQueryPlus, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
try {
Sequence<T> result = input.run(threadSafeQueryPlus, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
List<T> retVal = result.toList();
if (retVal == null) {
throw new ISE("Got a null list of results! WTF?!");
}
List<T> retVal = result.toList();
if (retVal == null) {
throw new ISE("Got a null list of results! WTF?!");
}
return retVal;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
return retVal;
}
catch (QueryInterruptedException e) {
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
);
}
}
);
}
)
)

View File

@ -46,6 +46,7 @@ import java.util.Objects;
public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
{
static final String CTX_GRAND_TOTAL = "grandTotal";
public static final String SKIP_EMPTY_BUCKETS = "skipEmptyBuckets";
private final VirtualColumns virtualColumns;
private final DimFilter dimFilter;
@ -126,7 +127,7 @@ public class TimeseriesQuery extends BaseQuery<Result<TimeseriesResultValue>>
public boolean isSkipEmptyBuckets()
{
return getContextBoolean("skipEmptyBuckets", false);
return getContextBoolean(SKIP_EMPTY_BUCKETS, false);
}
@Override

View File

@ -28,6 +28,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import io.druid.data.input.MapBasedRow;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
@ -41,10 +44,12 @@ import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.cache.CacheKeyBuilder;
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
@ -131,72 +136,97 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
};
return new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
final QueryPlus<Result<TimeseriesResultValue>> queryPlus,
final Map<String, Object> responseContext
)
{
final TimeseriesQuery query = (TimeseriesQuery) queryPlus.getQuery();
final Sequence<Result<TimeseriesResultValue>> baseResults = resultMergeQueryRunner.run(
queryPlus.withQuery(
queryPlus.getQuery()
.withOverriddenContext(
ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)
)
),
responseContext
return (queryPlus, responseContext) -> {
final TimeseriesQuery query = (TimeseriesQuery) queryPlus.getQuery();
final Sequence<Result<TimeseriesResultValue>> baseResults = resultMergeQueryRunner.run(
queryPlus.withQuery(
queryPlus.getQuery()
.withOverriddenContext(
ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)
)
),
responseContext
);
final Sequence<Result<TimeseriesResultValue>> finalSequence;
if (query.getGranularity().equals(Granularities.ALL) && !query.isSkipEmptyBuckets()) {
//Usally it is NOT Okay to materialize results via toList(), but Granularity is ALL thus we have only one record
final List<Result<TimeseriesResultValue>> val = baseResults.toList();
finalSequence = val.isEmpty() ? Sequences.simple(Collections.singletonList(
getNullTimeseriesResultValue(query))) : Sequences.simple(val);
} else {
finalSequence = baseResults;
}
if (query.isGrandTotal()) {
// Accumulate grand totals while iterating the sequence.
final Object[] grandTotals = new Object[query.getAggregatorSpecs().size()];
final Sequence<Result<TimeseriesResultValue>> mappedSequence = Sequences.map(
finalSequence,
resultValue -> {
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
final AggregatorFactory aggregatorFactory = query.getAggregatorSpecs().get(i);
final Object value = resultValue.getValue().getMetric(aggregatorFactory.getName());
if (grandTotals[i] == null) {
grandTotals[i] = value;
} else {
grandTotals[i] = aggregatorFactory.combine(grandTotals[i], value);
}
}
return resultValue;
}
);
if (query.isGrandTotal()) {
// Accumulate grand totals while iterating the sequence.
final Object[] grandTotals = new Object[query.getAggregatorSpecs().size()];
final Sequence<Result<TimeseriesResultValue>> mappedSequence = Sequences.map(
baseResults,
resultValue -> {
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
final AggregatorFactory aggregatorFactory = query.getAggregatorSpecs().get(i);
final Object value = resultValue.getValue().getMetric(aggregatorFactory.getName());
if (grandTotals[i] == null) {
grandTotals[i] = value;
} else {
grandTotals[i] = aggregatorFactory.combine(grandTotals[i], value);
}
}
return resultValue;
}
);
return Sequences.concat(
ImmutableList.of(
mappedSequence,
Sequences.simple(
() -> {
final Map<String, Object> totalsMap = new HashMap<>();
return Sequences.concat(
ImmutableList.of(
mappedSequence,
Sequences.simple(
() -> {
final Map<String, Object> totalsMap = new HashMap<>();
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
totalsMap.put(query.getAggregatorSpecs().get(i).getName(), grandTotals[i]);
}
final Result<TimeseriesResultValue> result = new Result<>(
null,
new TimeseriesResultValue(totalsMap)
);
return Collections.singletonList(result).iterator();
for (int i = 0; i < query.getAggregatorSpecs().size(); i++) {
totalsMap.put(query.getAggregatorSpecs().get(i).getName(), grandTotals[i]);
}
)
)
);
} else {
return baseResults;
}
final Result<TimeseriesResultValue> result = new Result<>(
null,
new TimeseriesResultValue(totalsMap)
);
return Collections.singletonList(result).iterator();
}
)
)
);
} else {
return finalSequence;
}
};
}
private Result<TimeseriesResultValue> getNullTimeseriesResultValue(TimeseriesQuery query)
{
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
String[] aggregatorNames = new String[aggregatorSpecs.size()];
for (int i = 0; i < aggregatorSpecs.size(); i++) {
aggregators[i] = aggregatorSpecs.get(i)
.factorize(RowBasedColumnSelectorFactory.create(() -> new MapBasedRow(
null,
null
), null));
aggregatorNames[i] = aggregatorSpecs.get(i).getName();
}
final DateTime start = query.getIntervals().isEmpty() ? DateTimes.EPOCH : query.getIntervals().get(0).getStart();
TimeseriesResultBuilder bob = new TimeseriesResultBuilder(start);
for (int i = 0; i < aggregatorSpecs.size(); i++) {
bob.addMetric(aggregatorNames[i], aggregators[i]);
aggregators[i].close();
}
return bob.build();
}
@Override
public TimeseriesQueryMetrics makeMetrics(TimeseriesQuery query)
{
@ -246,25 +276,20 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
@Override
public Function<Result<TimeseriesResultValue>, Object> prepareForCache(boolean isResultLevelCache)
{
return new Function<Result<TimeseriesResultValue>, Object>()
{
@Override
public Object apply(final Result<TimeseriesResultValue> input)
{
TimeseriesResultValue results = input.getValue();
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());
return input -> {
TimeseriesResultValue results = input.getValue();
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + aggs.size());
retVal.add(input.getTimestamp().getMillis());
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(results.getMetric(postAgg.getName()));
}
}
return retVal;
retVal.add(input.getTimestamp().getMillis());
for (AggregatorFactory agg : aggs) {
retVal.add(results.getMetric(agg.getName()));
}
if (isResultLevelCache) {
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
retVal.add(results.getMetric(postAgg.getName()));
}
}
return retVal;
};
}
@ -297,7 +322,7 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
}
return new Result<TimeseriesResultValue>(
return new Result<>(
timestamp,
new TimeseriesResultValue(retVal)
);
@ -311,20 +336,13 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TimeseriesResultValue>> runner)
{
return intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<TimeseriesResultValue>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> run(
QueryPlus<Result<TimeseriesResultValue>> queryPlus, Map<String, Object> responseContext
)
{
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery();
if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(timeseriesQuery);
}
return runner.run(queryPlus, responseContext);
(queryPlus, responseContext) -> {
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) queryPlus.getQuery();
if (timeseriesQuery.getDimensionsFilter() != null) {
timeseriesQuery = timeseriesQuery.withDimFilter(timeseriesQuery.getDimensionsFilter().optimize());
queryPlus = queryPlus.withQuery(timeseriesQuery);
}
return runner.run(queryPlus, responseContext);
}, this);
}
@ -348,31 +366,26 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
final TimeseriesQuery query, final MetricManipulationFn fn, final boolean calculatePostAggs
)
{
return new Function<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(Result<TimeseriesResultValue> result)
{
final TimeseriesResultValue holder = result.getValue();
final Map<String, Object> values = Maps.newHashMap(holder.getBaseObject());
if (calculatePostAggs && !query.getPostAggregatorSpecs().isEmpty()) {
// put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), holder.getMetric(agg.getName()));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(values));
}
}
return result -> {
final TimeseriesResultValue holder = result.getValue();
final Map<String, Object> values = Maps.newHashMap(holder.getBaseObject());
if (calculatePostAggs && !query.getPostAggregatorSpecs().isEmpty()) {
// put non finalized aggregators for calculating dependent post Aggregators
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
values.put(agg.getName(), holder.getMetric(agg.getName()));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), postAgg.compute(values));
}
return new Result<TimeseriesResultValue>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
}
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, holder.getMetric(agg.getName())));
}
return new Result<>(
result.getTimestamp(),
new TimeseriesResultValue(values)
);
};
}
}

View File

@ -68,7 +68,7 @@ public class TimeseriesQueryRunnerFactory
ExecutorService queryExecutor, Iterable<QueryRunner<Result<TimeseriesResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>(
return new ChainedExecutionQueryRunner<>(
queryExecutor, queryWatcher, queryRunners
);
}
@ -104,4 +104,5 @@ public class TimeseriesQueryRunnerFactory
return engine.process((TimeseriesQuery) input, adapter);
}
}
}

View File

@ -485,6 +485,56 @@ public class TimeseriesQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesIntervalOutOfRanges()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.emptyInterval)
.aggregators(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
QueryRunnerTestHelper.indexLongSum
)
)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.descending(descending)
.context(ImmutableMap.of(TimeseriesQuery.SKIP_EMPTY_BUCKETS, false))
.build();
List<Result<TimeseriesResultValue>> expectedResults = new ArrayList<>();
expectedResults.add(
new Result<>(
QueryRunnerTestHelper.emptyInterval.getIntervals().get(0).getStart(),
new TimeseriesResultValue(
ImmutableMap.of(
"rows",
0L,
"index",
0L,
QueryRunnerTestHelper.addRowsIndexConstantMetric,
1.0
)
)
)
);
// Must create a toolChest so we can run mergeResults (which creates the zeroed-out row).
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
// Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called.
final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner(
toolChest.mergeResults(runner),
toolChest
);
final List results = finalRunner.run(QueryPlus.wrap(query), CONTEXT).toList();
TestHelper.assertExpectedResults(expectedResults, results);
}
@Test
public void testTimeseriesWithVirtualColumn()
{

View File

@ -276,6 +276,73 @@ public class CalciteQueryTest extends CalciteTestBase
);
}
@Test
public void testSelectCountStart() throws Exception
{
testQuery(
PLANNER_CONFIG_DEFAULT,
QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS,
"SELECT exp(count(*)) + 10, sum(m2) FROM druid.foo WHERE dim2 = 0",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.filters(SELECTOR("dim2", "0", null))
.granularity(Granularities.ALL)
.aggregators(AGGS(
new CountAggregatorFactory("a0"),
new DoubleSumAggregatorFactory("a1", "m2")
))
.postAggregators(
EXPRESSION_POST_AGG("p0", "(exp(\"a0\") + 10)")
)
.context(QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS)
.build()),
ImmutableList.of(
new Object[]{11.0, 0.0}
)
);
testQuery(
PLANNER_CONFIG_DEFAULT,
QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS,
"SELECT exp(count(*)) + 10, sum(m2) FROM druid.foo WHERE __time >= TIMESTAMP '2999-01-01 00:00:00'",
CalciteTests.REGULAR_USER_AUTH_RESULT,
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Intervals.of("2999-01-01T00:00:00.000Z/146140482-04-24T15:36:27.903Z")))
.granularity(Granularities.ALL)
.aggregators(AGGS(
new CountAggregatorFactory("a0"),
new DoubleSumAggregatorFactory("a1", "m2")
))
.postAggregators(
EXPRESSION_POST_AGG("p0", "(exp(\"a0\") + 10)")
)
.context(QUERY_CONTEXT_DONT_SKIP_EMPTY_BUCKETS)
.build()),
ImmutableList.of(
new Object[]{11.0, 0.0}
)
);
testQuery(
"SELECT COUNT(*) FROM foo WHERE dim1 = 'nonexistent' GROUP BY FLOOR(__time TO DAY)",
ImmutableList.of(Druids.newTimeseriesQueryBuilder()
.dataSource(CalciteTests.DATASOURCE1)
.intervals(QSS(Filtration.eternity()))
.filters(SELECTOR("dim1", "nonexistent", null))
.granularity(Granularities.DAY)
.aggregators(AGGS(
new CountAggregatorFactory("a0")
))
.context(TIMESERIES_CONTEXT_DEFAULT)
.build()),
ImmutableList.of()
);
}
@Test
public void testSelectTrimFamily() throws Exception
{