diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java index cf063dea9f2..5a12f5dfdf9 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -413,7 +413,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest() { + private static final byte CACHE_STRATEGY_VERSION = 0x1; private final List aggs = query.getAggregatorSpecs(); + private final List dims = query.getDimensions(); + @Override public byte[] computeCacheKey(GroupByQuery query) @@ -435,7 +438,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest retVal = Lists.newArrayListWithCapacity(2); + final List retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size()); retVal.add(row.getTimestamp().getMillis()); - retVal.add(row.getEvent()); - + Map event = row.getEvent(); + for (DimensionSpec dim : dims) { + retVal.add(event.get(dim.getOutputName())); + } + for (AggregatorFactory agg : aggs) { + retVal.add(event.get(agg.getName())); + } return retVal; } @@ -500,21 +509,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest event = Maps.newLinkedHashMap(); + Iterator dimsIter = dims.iterator(); + while (dimsIter.hasNext() && results.hasNext()) { + final DimensionSpec factory = dimsIter.next(); + event.put(factory.getOutputName(), results.next()); + } + Iterator aggsIter = aggs.iterator(); - - Map event = jsonMapper.convertValue( - results.next(), - new TypeReference>() - { - } - ); - - while (aggsIter.hasNext()) { + while (aggsIter.hasNext() && results.hasNext()) { final AggregatorFactory factory = aggsIter.next(); - Object agg = event.get(factory.getName()); - if (agg != null) { - event.put(factory.getName(), factory.deserialize(agg)); - } + event.put(factory.getName(), factory.deserialize(results.next())); + } + + if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) { + throw new ISE( + "Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]", + dimsIter.hasNext(), + aggsIter.hasNext(), + results.hasNext() + ); } return new MapBasedRow( diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index df0ad3fe59e..664741a23e7 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -2363,4 +2363,121 @@ public class CachingClusteredClientTest makeTimeBoundaryResult(new DateTime("1970-01-05T01"), new DateTime("1970-01-05T01"), null) ); } + + @Test + public void testGroupByCachingRenamedAggs() throws Exception + { + GroupByQuery.Builder builder = new GroupByQuery.Builder() + .setDataSource(DATA_SOURCE) + .setQuerySegmentSpec(SEG_SPEC) + .setDimFilter(DIM_FILTER) + .setGranularity(GRANULARITY) + .setDimensions(Arrays.asList(new DefaultDimensionSpec("a", "output"))) + .setAggregatorSpecs(AGGS) + .setContext(CONTEXT); + + testQueryCaching( + client, + builder.build(), + new Interval("2011-01-01/2011-01-02"), + makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)), + + new Interval("2011-01-02/2011-01-03"), + makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)), + + new Interval("2011-01-05/2011-01-10"), + makeGroupByResults( + new DateTime("2011-01-05"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) + ), + + new Interval("2011-01-05/2011-01-10"), + makeGroupByResults( + new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) + ) + ); + + Supplier configSupplier = new Supplier() + { + @Override + public GroupByQueryConfig get() + { + return new GroupByQueryConfig(); + } + }; + QueryRunner runner = new FinalizeResultsQueryRunner( + client, + new GroupByQueryQueryToolChest( + configSupplier, + jsonMapper, + new GroupByQueryEngine( + configSupplier, + new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + } + ) + ), + TestQueryRunners.pool, + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + HashMap context = new HashMap(); + TestHelper.assertExpectedObjects( + makeGroupByResults( + new DateTime("2011-01-05T"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3), + new DateTime("2011-01-06T"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4), + new DateTime("2011-01-07T"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5), + new DateTime("2011-01-08T"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6), + new DateTime("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7), + new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7) + ), + runner.run( + builder.setInterval("2011-01-05/2011-01-10") + .build(), + context + ), + "" + ); + + TestHelper.assertExpectedObjects( + makeGroupByResults( + new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3), + new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3), + new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4), + new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4), + new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5), + new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5), + new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6), + new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6), + new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7), + new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7) + ), + runner.run( + builder.setInterval("2011-01-05/2011-01-10") + .setDimensions(Arrays.asList(new DefaultDimensionSpec("a", "output2"))) + .setAggregatorSpecs(RENAMED_AGGS) + .build(), + context + ), + "renamed aggregators test" + ); + } + }