mirror of https://github.com/apache/druid.git
fix output timestamps in groupby queries with granularity "all".
This commit is contained in:
parent
e72695111c
commit
c420fe3b56
|
@ -37,7 +37,6 @@ import com.metamx.druid.initialization.Initialization;
|
||||||
import com.metamx.druid.input.MapBasedRow;
|
import com.metamx.druid.input.MapBasedRow;
|
||||||
import com.metamx.druid.input.Row;
|
import com.metamx.druid.input.Row;
|
||||||
import com.metamx.druid.input.Rows;
|
import com.metamx.druid.input.Rows;
|
||||||
import com.metamx.druid.query.CacheStrategy;
|
|
||||||
import com.metamx.druid.query.MetricManipulationFn;
|
import com.metamx.druid.query.MetricManipulationFn;
|
||||||
import com.metamx.druid.query.QueryRunner;
|
import com.metamx.druid.query.QueryRunner;
|
||||||
import com.metamx.druid.query.QueryToolChest;
|
import com.metamx.druid.query.QueryToolChest;
|
||||||
|
@ -78,7 +77,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
{
|
{
|
||||||
final GroupByQuery query = (GroupByQuery) input;
|
final GroupByQuery query = (GroupByQuery) input;
|
||||||
|
|
||||||
List<Interval> condensed = query.getIntervals();
|
final QueryGranularity gran = query.getGranularity();
|
||||||
|
final long timeStart = query.getIntervals().get(0).getStartMillis();
|
||||||
|
|
||||||
|
// use gran.iterable instead of gran.truncate so that
|
||||||
|
// AllGranularity returns timeStart instead of Long.MIN_VALUE
|
||||||
|
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
|
||||||
|
|
||||||
final List<AggregatorFactory> aggs = Lists.transform(
|
final List<AggregatorFactory> aggs = Lists.transform(
|
||||||
query.getAggregatorSpecs(),
|
query.getAggregatorSpecs(),
|
||||||
new Function<AggregatorFactory, AggregatorFactory>()
|
new Function<AggregatorFactory, AggregatorFactory>()
|
||||||
|
@ -102,10 +107,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
final QueryGranularity gran = query.getGranularity();
|
|
||||||
final IncrementalIndex index = runner.run(query).accumulate(
|
final IncrementalIndex index = runner.run(query).accumulate(
|
||||||
new IncrementalIndex(
|
new IncrementalIndex(
|
||||||
gran.truncate(condensed.get(0).getStartMillis()),
|
// use granularity truncated min timestamp
|
||||||
|
// since incoming truncated timestamps may precede timeStart
|
||||||
|
granTimeStart,
|
||||||
gran,
|
gran,
|
||||||
aggs.toArray(new AggregatorFactory[aggs.size()])
|
aggs.toArray(new AggregatorFactory[aggs.size()])
|
||||||
),
|
),
|
||||||
|
@ -128,13 +134,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
||||||
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
|
||||||
new Function<Row, Row>()
|
new Function<Row, Row>()
|
||||||
{
|
{
|
||||||
private final QueryGranularity granularity = query.getGranularity();
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Row apply(Row input)
|
public Row apply(Row input)
|
||||||
{
|
{
|
||||||
final MapBasedRow row = (MapBasedRow) input;
|
final MapBasedRow row = (MapBasedRow) input;
|
||||||
return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.druid.PeriodGranularity;
|
import com.metamx.druid.PeriodGranularity;
|
||||||
import com.metamx.druid.Query;
|
import com.metamx.druid.Query;
|
||||||
|
import com.metamx.druid.QueryGranularity;
|
||||||
import com.metamx.druid.TestHelper;
|
import com.metamx.druid.TestHelper;
|
||||||
import com.metamx.druid.aggregation.AggregatorFactory;
|
import com.metamx.druid.aggregation.AggregatorFactory;
|
||||||
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
|
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
|
||||||
|
@ -232,6 +233,7 @@ public class GroupByQueryRunnerTest
|
||||||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
|
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
|
||||||
|
|
||||||
final GroupByQuery fullQuery = builder.build();
|
final GroupByQuery fullQuery = builder.build();
|
||||||
|
final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build();
|
||||||
|
|
||||||
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
|
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
|
||||||
new QueryRunner<Row>()
|
new QueryRunner<Row>()
|
||||||
|
@ -265,6 +267,22 @@ public class GroupByQueryRunnerTest
|
||||||
|
|
||||||
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
|
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
|
||||||
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
|
||||||
|
|
||||||
|
List<Row> allGranExpectedResults = Arrays.<Row>asList(
|
||||||
|
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 216L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 221L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 4416L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L),
|
||||||
|
createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
|
||||||
|
);
|
||||||
|
|
||||||
|
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
|
||||||
|
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
|
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
|
||||||
|
|
Loading…
Reference in New Issue