From 184b12bee849a2058e38488e01632046fc9dfc3f Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 7 Jul 2015 19:09:55 +0530 Subject: [PATCH] fix groupBy caching to work with renamed aggregators Issue - while storing results in cache we store the event map which contains aggregator names mapped to values. Now when someone fire same query after renaming aggs, the cache key will be same but the event will contain metric values mapped to older names which leads to wrong results. Fix - modify cache to not store raw event but the actual list of values only. review comments + fix dimension renaming review comment --- .../groupby/GroupByQueryQueryToolChest.java | 48 ++++--- .../client/CachingClusteredClientTest.java | 117 ++++++++++++++++++ 2 files changed, 148 insertions(+), 17 deletions(-) diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index cf063dea9f2..5a12f5dfdf9 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -413,7 +413,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { + private static final byte CACHE_STRATEGY_VERSION = 0x1; private final List aggs = query.getAggregatorSpecs(); + private final List dims = query.getDimensions(); + @Override public byte[] computeCacheKey(GroupByQuery query) @@ -435,7 +438,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest retVal = Lists.newArrayListWithCapacity(2); + final List retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size()); retVal.add(row.getTimestamp().getMillis()); - retVal.add(row.getEvent()); - + Map event = row.getEvent(); + for (DimensionSpec dim : dims) { + retVal.add(event.get(dim.getOutputName())); + } + for (AggregatorFactory agg : aggs) { + retVal.add(event.get(agg.getName())); + } return retVal; } @@ -500,21 +509,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest event = Maps.newLinkedHashMap(); + Iterator dimsIter = dims.iterator(); + while (dimsIter.hasNext() && results.hasNext()) { + final DimensionSpec factory = dimsIter.next(); + event.put(factory.getOutputName(), results.next()); + } + Iterator aggsIter = aggs.iterator(); - - Map event = jsonMapper.convertValue( - results.next(), - new TypeReference>() - { - } - ); - - while (aggsIter.hasNext()) { + while (aggsIter.hasNext() && results.hasNext()) { final AggregatorFactory factory = aggsIter.next(); - Object agg = event.get(factory.getName()); - if (agg != null) { - event.put(factory.getName(), factory.deserialize(agg)); - } + event.put(factory.getName(), factory.deserialize(results.next())); + } + + if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) { + throw new ISE( + "Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]", + dimsIter.hasNext(), + aggsIter.hasNext(), + results.hasNext() + ); } return new MapBasedRow( diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index df0ad3fe59e..664741a23e7 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -2363,4 +2363,121 @@ public class CachingClusteredClientTest makeTimeBoundaryResult(new DateTime("1970-01-05T01"), new DateTime("1970-01-05T01"), null) ); } + + @Test + public void testGroupByCachingRenamedAggs() throws Exception + { + GroupByQuery.Builder builder = new GroupByQuery.Builder() + .setDataSource(DATA_SOURCE) + .setQuerySegmentSpec(SEG_SPEC) + .setDimFilter(DIM_FILTER) + .setGranularity(GRANULARITY) + .setDimensions(Arrays.asList(new DefaultDimensionSpec("a", "output"))) + .setAggregatorSpecs(AGGS) + .setContext(CONTEXT); + + testQueryCaching( + client, + builder.build(), + new Interval("2011-01-01/2011-01-02"), + makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)), + + new Interval("2011-01-02/2011-01-03"), + makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)), + + new Interval("2011-01-05/2011-01-10"), + makeGroupByResults( + new DateTime("2011-01-05"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) + ), + + new Interval("2011-01-05/2011-01-10"), + makeGroupByResults( + new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) + ) + ); + + Supplier configSupplier = new Supplier() + { + @Override + public GroupByQueryConfig get() + { + return new GroupByQueryConfig(); + } + }; + QueryRunner runner = new FinalizeResultsQueryRunner( + client, + new GroupByQueryQueryToolChest( + configSupplier, + jsonMapper, + new GroupByQueryEngine( + configSupplier, + new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ), + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + HashMap context = new HashMap(); + TestHelper.assertExpectedObjects( + makeGroupByResults( + new DateTime("2011-01-05T"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06T"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07T"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08T"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7), + new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) + ), + runner.run( + builder.setInterval("2011-01-05/2011-01-10") + .build(), + context + ), + "" + ); + + TestHelper.assertExpectedObjects( + makeGroupByResults( + new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3), + new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3), + new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4), + new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4), + new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5), + new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5), + new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6), + new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6), + new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7), + new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7) + ), + runner.run( + builder.setInterval("2011-01-05/2011-01-10") + .setDimensions(Arrays.asList(new DefaultDimensionSpec("a", "output2"))) + .setAggregatorSpecs(RENAMED_AGGS) + .build(), + context + ), + "renamed aggregators test" + ); + } + }