mirror of https://github.com/apache/druid.git
fix groupBy caching to work with renamed aggregators
Issue - while storing results in cache we store the event map which contains aggregator names mapped to values. Now when someone fire same query after renaming aggs, the cache key will be same but the event will contain metric values mapped to older names which leads to wrong results. Fix - modify cache to not store raw event but the actual list of values only. review comments + fix dimension renaming review comment
This commit is contained in:
parent
31d05e33a1
commit
184b12bee8
|
@ -413,7 +413,10 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
{
|
||||
return new CacheStrategy<Row, Object, GroupByQuery>()
|
||||
{
|
||||
private static final byte CACHE_STRATEGY_VERSION = 0x1;
|
||||
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
|
||||
private final List<DimensionSpec> dims = query.getDimensions();
|
||||
|
||||
|
||||
@Override
|
||||
public byte[] computeCacheKey(GroupByQuery query)
|
||||
|
@ -435,7 +438,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
|
||||
ByteBuffer buffer = ByteBuffer
|
||||
.allocate(
|
||||
1
|
||||
2
|
||||
+ granularityBytes.length
|
||||
+ filterBytes.length
|
||||
+ aggregatorBytes.length
|
||||
|
@ -444,6 +447,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
+ limitBytes.length
|
||||
)
|
||||
.put(GROUPBY_QUERY)
|
||||
.put(CACHE_STRATEGY_VERSION)
|
||||
.put(granularityBytes)
|
||||
.put(filterBytes)
|
||||
.put(aggregatorBytes);
|
||||
|
@ -474,10 +478,15 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
{
|
||||
if (input instanceof MapBasedRow) {
|
||||
final MapBasedRow row = (MapBasedRow) input;
|
||||
final List<Object> retVal = Lists.newArrayListWithCapacity(2);
|
||||
final List<Object> retVal = Lists.newArrayListWithCapacity(1 + dims.size() + aggs.size());
|
||||
retVal.add(row.getTimestamp().getMillis());
|
||||
retVal.add(row.getEvent());
|
||||
|
||||
Map<String, Object> event = row.getEvent();
|
||||
for (DimensionSpec dim : dims) {
|
||||
retVal.add(event.get(dim.getOutputName()));
|
||||
}
|
||||
for (AggregatorFactory agg : aggs) {
|
||||
retVal.add(event.get(agg.getName()));
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
|
@ -500,21 +509,26 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
|
||||
DateTime timestamp = granularity.toDateTime(((Number) results.next()).longValue());
|
||||
|
||||
Map<String, Object> event = Maps.newLinkedHashMap();
|
||||
Iterator<DimensionSpec> dimsIter = dims.iterator();
|
||||
while (dimsIter.hasNext() && results.hasNext()) {
|
||||
final DimensionSpec factory = dimsIter.next();
|
||||
event.put(factory.getOutputName(), results.next());
|
||||
}
|
||||
|
||||
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
|
||||
|
||||
Map<String, Object> event = jsonMapper.convertValue(
|
||||
results.next(),
|
||||
new TypeReference<Map<String, Object>>()
|
||||
{
|
||||
}
|
||||
);
|
||||
|
||||
while (aggsIter.hasNext()) {
|
||||
while (aggsIter.hasNext() && results.hasNext()) {
|
||||
final AggregatorFactory factory = aggsIter.next();
|
||||
Object agg = event.get(factory.getName());
|
||||
if (agg != null) {
|
||||
event.put(factory.getName(), factory.deserialize(agg));
|
||||
}
|
||||
event.put(factory.getName(), factory.deserialize(results.next()));
|
||||
}
|
||||
|
||||
if (dimsIter.hasNext() || aggsIter.hasNext() || results.hasNext()) {
|
||||
throw new ISE(
|
||||
"Found left over objects while reading from cache!! dimsIter[%s] aggsIter[%s] results[%s]",
|
||||
dimsIter.hasNext(),
|
||||
aggsIter.hasNext(),
|
||||
results.hasNext()
|
||||
);
|
||||
}
|
||||
|
||||
return new MapBasedRow(
|
||||
|
|
|
@ -2363,4 +2363,121 @@ public class CachingClusteredClientTest
|
|||
makeTimeBoundaryResult(new DateTime("1970-01-05T01"), new DateTime("1970-01-05T01"), null)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByCachingRenamedAggs() throws Exception
|
||||
{
|
||||
GroupByQuery.Builder builder = new GroupByQuery.Builder()
|
||||
.setDataSource(DATA_SOURCE)
|
||||
.setQuerySegmentSpec(SEG_SPEC)
|
||||
.setDimFilter(DIM_FILTER)
|
||||
.setGranularity(GRANULARITY)
|
||||
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "output")))
|
||||
.setAggregatorSpecs(AGGS)
|
||||
.setContext(CONTEXT);
|
||||
|
||||
testQueryCaching(
|
||||
client,
|
||||
builder.build(),
|
||||
new Interval("2011-01-01/2011-01-02"),
|
||||
makeGroupByResults(new DateTime("2011-01-01"), ImmutableMap.of("output", "a", "rows", 1, "imps", 1, "impers", 1)),
|
||||
|
||||
new Interval("2011-01-02/2011-01-03"),
|
||||
makeGroupByResults(new DateTime("2011-01-02"), ImmutableMap.of("output", "b", "rows", 2, "imps", 2, "impers", 2)),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeGroupByResults(
|
||||
new DateTime("2011-01-05"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
|
||||
new DateTime("2011-01-06"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
|
||||
new DateTime("2011-01-07"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
|
||||
new DateTime("2011-01-08"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
|
||||
new DateTime("2011-01-09"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
|
||||
),
|
||||
|
||||
new Interval("2011-01-05/2011-01-10"),
|
||||
makeGroupByResults(
|
||||
new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
|
||||
new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
|
||||
new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
|
||||
new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
|
||||
new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
|
||||
)
|
||||
);
|
||||
|
||||
Supplier<GroupByQueryConfig> configSupplier = new Supplier<GroupByQueryConfig>()
|
||||
{
|
||||
@Override
|
||||
public GroupByQueryConfig get()
|
||||
{
|
||||
return new GroupByQueryConfig();
|
||||
}
|
||||
};
|
||||
QueryRunner runner = new FinalizeResultsQueryRunner(
|
||||
client,
|
||||
new GroupByQueryQueryToolChest(
|
||||
configSupplier,
|
||||
jsonMapper,
|
||||
new GroupByQueryEngine(
|
||||
configSupplier,
|
||||
new StupidPool<>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(1024 * 1024);
|
||||
}
|
||||
}
|
||||
)
|
||||
),
|
||||
TestQueryRunners.pool,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
)
|
||||
);
|
||||
HashMap<String, Object> context = new HashMap<String, Object>();
|
||||
TestHelper.assertExpectedObjects(
|
||||
makeGroupByResults(
|
||||
new DateTime("2011-01-05T"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
|
||||
new DateTime("2011-01-05T01"), ImmutableMap.of("output", "c", "rows", 3, "imps", 3, "impers", 3),
|
||||
new DateTime("2011-01-06T"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
|
||||
new DateTime("2011-01-06T01"), ImmutableMap.of("output", "d", "rows", 4, "imps", 4, "impers", 4),
|
||||
new DateTime("2011-01-07T"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
|
||||
new DateTime("2011-01-07T01"), ImmutableMap.of("output", "e", "rows", 5, "imps", 5, "impers", 5),
|
||||
new DateTime("2011-01-08T"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
|
||||
new DateTime("2011-01-08T01"), ImmutableMap.of("output", "f", "rows", 6, "imps", 6, "impers", 6),
|
||||
new DateTime("2011-01-09T"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7),
|
||||
new DateTime("2011-01-09T01"), ImmutableMap.of("output", "g", "rows", 7, "imps", 7, "impers", 7)
|
||||
),
|
||||
runner.run(
|
||||
builder.setInterval("2011-01-05/2011-01-10")
|
||||
.build(),
|
||||
context
|
||||
),
|
||||
""
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(
|
||||
makeGroupByResults(
|
||||
new DateTime("2011-01-05T"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
|
||||
new DateTime("2011-01-05T01"), ImmutableMap.of("output2", "c", "rows2", 3, "imps", 3, "impers2", 3),
|
||||
new DateTime("2011-01-06T"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
|
||||
new DateTime("2011-01-06T01"), ImmutableMap.of("output2", "d", "rows2", 4, "imps", 4, "impers2", 4),
|
||||
new DateTime("2011-01-07T"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
|
||||
new DateTime("2011-01-07T01"), ImmutableMap.of("output2", "e", "rows2", 5, "imps", 5, "impers2", 5),
|
||||
new DateTime("2011-01-08T"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
|
||||
new DateTime("2011-01-08T01"), ImmutableMap.of("output2", "f", "rows2", 6, "imps", 6, "impers2", 6),
|
||||
new DateTime("2011-01-09T"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7),
|
||||
new DateTime("2011-01-09T01"), ImmutableMap.of("output2", "g", "rows2", 7, "imps", 7, "impers2", 7)
|
||||
),
|
||||
runner.run(
|
||||
builder.setInterval("2011-01-05/2011-01-10")
|
||||
.setDimensions(Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("a", "output2")))
|
||||
.setAggregatorSpecs(RENAMED_AGGS)
|
||||
.build(),
|
||||
context
|
||||
),
|
||||
"renamed aggregators test"
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue