Fix time-extraction topN with non-STRING outputType. (#7257)

Similar to other bugs fixed in #6220, but this one was missed. This bug would
cause "extraction" dimensionSpecs on the "__time" column with non-STRING
outputTypes to potentially be output as STRING sometimes instead of LONG,
causing incompletely merged results.
This commit is contained in:
Gian Merlino 2019-03-13 16:53:07 -04:00 committed by Fangjin Yang
parent 3895914aa2
commit 98a1b5537f
2 changed files with 74 additions and 11 deletions

View File

@ -22,25 +22,36 @@ package org.apache.druid.query.topn;
import org.apache.druid.query.ColumnSelectorPlus;
import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.column.ValueType;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<String, Aggregator[]>, TopNParams>
public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<Comparable, Aggregator[]>, TopNParams>
{
public static final int[] EMPTY_INTS = new int[]{};
private static final int[] EMPTY_INTS = new int[]{};
private final TopNQuery query;
private final Function<Object, Comparable<?>> dimensionValueConverter;
public TimeExtractionTopNAlgorithm(StorageAdapter storageAdapter, TopNQuery query)
{
super(storageAdapter);
this.query = query;
// This strategy is used for ExtractionFns on the __time column. They always return STRING, so we need to convert
// from STRING to the desired output type.
this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(
ValueType.STRING,
query.getDimensionSpec().getOutputType()
);
}
@Override
@SuppressWarnings("unchecked")
public TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor)
{
return new TopNParams(
@ -63,13 +74,18 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
}
@Override
protected Map<String, Aggregator[]> makeDimValAggregateStore(TopNParams params)
@SuppressWarnings("unchecked")
protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
{
return new HashMap<>();
return params.getSelectorPlus().getColumnSelectorStrategy().makeDimExtractionAggregateStore();
}
@Override
protected long scanAndAggregate(TopNParams params, int[] dimValSelector, Map<String, Aggregator[]> aggregatesStore)
protected long scanAndAggregate(
TopNParams params,
int[] dimValSelector,
Map<Comparable, Aggregator[]> aggregatesStore
)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
@ -80,7 +96,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
long processedRows = 0;
while (!cursor.isDone()) {
final String key = dimSelector.lookupName(dimSelector.getRow().get(0));
final Comparable key = dimensionValueConverter.apply(dimSelector.lookupName(dimSelector.getRow().get(0)));
Aggregator[] theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
@ -102,11 +118,11 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
protected void updateResults(
TopNParams params,
int[] dimValSelector,
Map<String, Aggregator[]> aggregatesStore,
Map<Comparable, Aggregator[]> aggregatesStore,
TopNResultBuilder resultBuilder
)
{
for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null) {
Object[] vals = new Object[aggs.length];
@ -124,7 +140,7 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
}
@Override
protected void closeAggregators(Map<String, Aggregator[]> stringMap)
protected void closeAggregators(Map<Comparable, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {

View File

@ -2364,6 +2364,53 @@ public class TopNQueryRunnerTest
TestHelper.assertExpectedResults(expectedResults, list, "Failed to match");
}
@Test
public void testTopNDimExtractionTimeToOneLong()
{
TopNQuery query = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension(
new ExtractionDimensionSpec(
ColumnHolder.TIME_COLUMN_NAME,
"t",
ValueType.LONG,
new JavaScriptExtractionFn(
"function(f) { return \"42\"; }",
false,
JavaScriptConfig.getEnabledInstance()
)
)
)
.metric("rows")
.threshold(10)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(commonAggregators)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.of(
"addRowsIndexConstant", 504542.5071372986D,
"index", 503332.5071372986D,
"t", 42L,
"uniques", QueryRunnerTestHelper.UNIQUES_9,
"rows", 1209L
)
)
)
)
);
List<Result<TopNResultValue>> list = runWithMerge(query).toList();
Assert.assertEquals(list.size(), 1);
Assert.assertEquals("Didn't merge results", list.get(0).getValue().getValue().size(), 1);
TestHelper.assertExpectedResults(expectedResults, list, "Failed to match");
}
@Test
public void testTopNCollapsingDimExtraction()
{