Merge pull request #1499 from metamx/fix-groupby-caching

fix groupBy caching to work with renamed aggregators
This commit is contained in:
Xavier Léauté 2015-07-08 23:52:11 -07:00
commit b3d360f990
2 changed files with 148 additions and 17 deletions

View File

@ -413,7 +413,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
return new CacheStrategy<Row, Object, GroupByQuery>()
{
private static final byte CACHE_STRATEGY_VERSION = 0x1;
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();
@Override
public byte[] computeCacheKey(GroupByQuery query)
@ -435,7 +438,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
ByteBuffer buffer = ByteBuffer
.allocate(
1
2
+ granularityBytes.length
+ filterBytes.length
+ aggregatorBytes.length
@ -444,6 +447,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
+ limitBytes.length
)
.put(GROUPBY_QUERY)
.put(CACHE_STRATEGY_VERSION)
.put(granularityBytes)
.put(filterBytes)
.put(aggregatorBytes);
@ -474,10 +478,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
if (input instanceof MapBasedRow) {
final MapBasedRow row = (MapBasedRow) input;
final List<Object> retVal = Lists.newArrayListWithCapacity(2);
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size());
retVal.add(row.getTimestamp().getMillis());
retVal.add(row.getEvent());
Map<String, Object> event = row.getEvent();
for (DimensionSpec dim : dims) {
retVal.add(event.get(dim.getOutputName()));
}
for (AggregatorFactory agg : aggs) {
retVal.add(event.get(agg.getName()));
}
return retVal;
}
@ -500,21 +509,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
Map<String, Object> event = Maps.newLinkedHashMap();
Iterator<DimensionSpec> dimsIter = dims.iterator();
while (dimsIter.hasNext() && results.hasNext()) {
final DimensionSpec factory = dimsIter.next();
event.put(factory.getOutputName(), results.next());
}
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Map<String, Object> event = jsonMapper.convertValue(
results.next(),
new TypeReference<Map<String, Object>>()
{
}
);
while (aggsIter.hasNext()) {
while (aggsIter.hasNext() && results.hasNext()) {
final AggregatorFactory factory = aggsIter.next();
Object agg = event.get(factory.getName());
if (agg != null) {
event.put(factory.getName(), factory.deserialize(agg));
event.put(factory.getName(), factory.deserialize(results.next()));
}
if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) {
throw new ISE(
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]",
dimsIter.hasNext(),
aggsIter.hasNext(),
results.hasNext()
);
}
return new MapBasedRow(

View File

@ -2363,4 +2363,121 @@ public class CachingClusteredClientTest
makeTimeBoundaryResult(new DateTime("1970-01-05T01"), new DateTime("1970-01-05T01"), null)
);
}
@Test
public void testGroupByCachingRenamedAggs() throws Exception
{
GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(SEG_SPEC)
.setDimFilter(DIM_FILTER)
.setGranularity(GRANULARITY)
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "output")))
.setAggregatorSpecs(AGGS)
.setContext(CONTEXT);
testQueryCaching(
client,
builder.build(),
new Interval("2011-01-01/2011-01-02"),
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)),
new Interval("2011-01-02/2011-01-03"),
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
),
new Interval("2011-01-05/2011-01-10"),
makeGroupByResults(
new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
)
);
Supplier<GroupByQueryConfig> configSupplier = new Supplier<GroupByQueryConfig>()
{
@Override
public GroupByQueryConfig get()
{
return new GroupByQueryConfig();
}
};
QueryRunner runner = new FinalizeResultsQueryRunner(
client,
new GroupByQueryQueryToolChest(
configSupplier,
jsonMapper,
new GroupByQueryEngine(
configSupplier,
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
),
TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);
HashMap<String, Object> context = new HashMap<String, Object>();
TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
new DateTime("2011-01-06T"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
new DateTime("2011-01-07T"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
new DateTime("2011-01-08T"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
new DateTime("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7),
new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
.build(),
context
),
""
);
TestHelper.assertExpectedObjects(
makeGroupByResults(
new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7),
new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7)
),
runner.run(
builder.setInterval("2011-01-05/2011-01-10")
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "output2")))
.setAggregatorSpecs(RENAMED_AGGS)
.build(),
context
),
"renamed aggregators test"
);
}
}