Merge pull request #109 from metamx/fix-groupby-allgran-timestamp

fix output timestamps in groupby queries with granularity "all".
This commit is contained in:
fjy 2013-03-25 12:58:32 -07:00
commit 669f792668
2 changed files with 29 additions and 7 deletions

View File

@ -37,7 +37,6 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.input.Rows;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
@ -78,7 +77,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
final GroupByQuery query = (GroupByQuery) input;
List<Interval> condensed = query.getIntervals();
final QueryGranularity gran = query.getGranularity();
final long timeStart = query.getIntervals().get(0).getStartMillis();
// use gran.iterable instead of gran.truncate so that
// AllGranularity returns timeStart instead of Long.MIN_VALUE
final long granTimeStart = gran.iterable(timeStart, timeStart+1).iterator().next();
final List<AggregatorFactory> aggs = Lists.transform(
query.getAggregatorSpecs(),
new Function<AggregatorFactory, AggregatorFactory>()
@ -102,10 +107,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
);
final QueryGranularity gran = query.getGranularity();
final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
gran.truncate(condensed.get(0).getStartMillis()),
// use granularity truncated min timestamp
// since incoming truncated timestamps may precede timeStart
granTimeStart,
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
@ -128,13 +134,11 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
private final QueryGranularity granularity = query.getGranularity();
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(granularity.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
}
}
);

View File

@ -30,6 +30,7 @@ import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
@ -232,6 +233,7 @@ public class GroupByQueryRunnerTest
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
final GroupByQuery fullQuery = builder.build();
final GroupByQuery allGranQuery = builder.copy().setGranularity(QueryGranularity.ALL).build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
new QueryRunner<Row>()
@ -265,6 +267,22 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
List<Row> allGranExpectedResults = Arrays.<Row>asList(
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 243L)
);
TestHelper.assertExpectedObjects(allGranExpectedResults, runner.run(allGranQuery), "direct");
TestHelper.assertExpectedObjects(allGranExpectedResults, mergedRunner.run(allGranQuery), "merged");
}
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)