topN: Fix caching of Float dimension values. (#5653)

Jackson would deserialize them as Doubles, leading to ClassCastExceptions
in the topN processing pipeline when it attempted to treat them as Floats.
This commit is contained in:
Gian Merlino 2018-04-17 15:35:18 -07:00 committed by Fangjin Yang
parent a7ba2bf275
commit 5d09f76df6
2 changed files with 89 additions and 70 deletions

View File

@ -388,6 +388,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
Iterator<Object> inputIter = results.iterator();
DateTime timestamp = granularity.toDateTime(((Number) inputIter.next()).longValue());
// Need a value transformer to convert generic Jackson-deserialized type into the proper type.
final Function<Object, Object> dimValueTransformer = TopNMapFn.getValueTransformer(
query.getDimensionSpec().getOutputType()
);
while (inputIter.hasNext()) {
List<Object> result = (List<Object>) inputIter.next();
Map<String, Object> vals = Maps.newLinkedHashMap();
@ -395,7 +400,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
Iterator<AggregatorFactory> aggIter = aggs.iterator();
Iterator<Object> resultIter = result.iterator();
vals.put(query.getDimensionSpec().getOutputName(), resultIter.next());
vals.put(query.getDimensionSpec().getOutputName(), dimValueTransformer.apply(resultIter.next()));
while (aggIter.hasNext() && resultIter.hasNext()) {
final AggregatorFactory factory = aggIter.next();

View File

@ -47,9 +47,11 @@ import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.TestIndex;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.ValueType;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
@ -61,76 +63,15 @@ public class TopNQueryQueryToolChestTest
@Test
public void testCacheStrategy() throws Exception
{
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy =
new TopNQueryQueryToolChest(null, null).getCacheStrategy(
new TopNQuery(
new TableDataSource("dummy"),
VirtualColumns.EMPTY,
new DefaultDimensionSpec("test", "test"),
new NumericTopNMetricSpec("metric1"),
3,
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
null,
Granularities.ALL,
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),
ImmutableList.<PostAggregator>of(new ConstantPostAggregator("post", 10)),
null
)
);
final Result<TopNResultValue> result1 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"test", "val1",
"metric1", 2
)
)
)
);
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
result1
);
ObjectMapper objectMapper = TestHelper.makeJsonMapper();
Object fromCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedValue),
strategy.getCacheObjectClazz()
);
Result<TopNResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result1, fromCacheResult);
final Result<TopNResultValue> result2 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"test", "val1",
"metric1", 2,
"post", 10
)
)
)
);
Object preparedResultCacheValue = strategy.prepareForCache(true).apply(
result2
);
Object fromResultCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedResultCacheValue),
strategy.getCacheObjectClazz()
);
Result<TopNResultValue> fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue);
Assert.assertEquals(result2, fromResultCacheResult);
doTestCacheStrategy(ValueType.STRING, "val1");
doTestCacheStrategy(ValueType.FLOAT, 2.1f);
doTestCacheStrategy(ValueType.DOUBLE, 2.1d);
doTestCacheStrategy(ValueType.LONG, 2L);
}
@Test
public void testCacheStrategyWithFloatDimension() throws Exception
{
}
@Test
@ -242,6 +183,79 @@ public class TopNQueryQueryToolChestTest
Assert.assertEquals(2000, mockRunner.query.getThreshold());
}
private void doTestCacheStrategy(final ValueType valueType, final Object dimValue) throws IOException
{
CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> strategy =
new TopNQueryQueryToolChest(null, null).getCacheStrategy(
new TopNQuery(
new TableDataSource("dummy"),
VirtualColumns.EMPTY,
new DefaultDimensionSpec("test", "test", valueType),
new NumericTopNMetricSpec("metric1"),
3,
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
null,
Granularities.ALL,
ImmutableList.<AggregatorFactory>of(new CountAggregatorFactory("metric1")),
ImmutableList.<PostAggregator>of(new ConstantPostAggregator("post", 10)),
null
)
);
final Result<TopNResultValue> result1 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"test", dimValue,
"metric1", 2
)
)
)
);
Object preparedValue = strategy.prepareForSegmentLevelCache().apply(
result1
);
ObjectMapper objectMapper = TestHelper.makeJsonMapper();
Object fromCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedValue),
strategy.getCacheObjectClazz()
);
Result<TopNResultValue> fromCacheResult = strategy.pullFromSegmentLevelCache().apply(fromCacheValue);
Assert.assertEquals(result1, fromCacheResult);
final Result<TopNResultValue> result2 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
Arrays.asList(
ImmutableMap.<String, Object>of(
"test", dimValue,
"metric1", 2,
"post", 10
)
)
)
);
Object preparedResultCacheValue = strategy.prepareForCache(true).apply(
result2
);
Object fromResultCacheValue = objectMapper.readValue(
objectMapper.writeValueAsBytes(preparedResultCacheValue),
strategy.getCacheObjectClazz()
);
Result<TopNResultValue> fromResultCacheResult = strategy.pullFromCache(true).apply(fromResultCacheValue);
Assert.assertEquals(result2, fromResultCacheResult);
}
static class MockQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;