mirror of https://github.com/apache/druid.git
Fix four bugs with numeric dimension output types. (#6220)
* Fix four bugs with numeric dimension output types. This patch includes the following bug fixes: - TopNColumnSelectorStrategyFactory: Cast dimension values to the output type during dimExtractionScanAndAggregate instead of updateDimExtractionResults. This fixes a bug where, for example, grouping on doubles-cast-to-longs would fail to merge two doubles that should have been combined into the same long value. - TopNQueryEngine: Use DimExtractionTopNAlgorithm when treating string columns as numeric dimensions. This fixes a similar bug: grouping on string-cast-to-long would fail to merge two strings that should have been combined. - GroupByQuery: Cast numeric types to the expected output type before comparing them in compareDimsForLimitPushDown. This fixes #6123. - GroupByQueryQueryToolChest: Convert Jackson-deserialized dimension values into the proper output type. This fixes an inconsistency between results that came from cache vs. not-cache: for example, Jackson sometimes deserializes integers as Integers and sometimes as Longs. And the following code-cleanup changes, related to the fixes above: - DimensionHandlerUtils: Introduce convertObjectToType, compareObjectsAsType, and converterFromTypeToType to make it easier to handle casting operations. - TopN in general: Rename various "dimName" variables to "dimValue" where they actually represent dimension values. The old names were confusing. * Remove unused imports.
This commit is contained in:
parent
c3aaf8122d
commit
23ba6f7ad7
|
@ -379,7 +379,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
final List<String> orderedFieldNames = new ArrayList<>();
|
||||
final Set<Integer> dimsInOrderBy = new HashSet<>();
|
||||
final List<Boolean> needsReverseList = new ArrayList<>();
|
||||
final List<Boolean> isNumericField = new ArrayList<>();
|
||||
final List<ValueType> dimensionTypes = new ArrayList<>();
|
||||
final List<StringComparator> comparators = new ArrayList<>();
|
||||
|
||||
for (OrderByColumnSpec orderSpec : limitSpec.getColumns()) {
|
||||
|
@ -391,7 +391,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
dimsInOrderBy.add(dimIndex);
|
||||
needsReverseList.add(needsReverse);
|
||||
final ValueType type = dimensions.get(dimIndex).getOutputType();
|
||||
isNumericField.add(ValueType.isNumeric(type));
|
||||
dimensionTypes.add(type);
|
||||
comparators.add(orderSpec.getDimensionComparator());
|
||||
}
|
||||
}
|
||||
|
@ -401,7 +401,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
orderedFieldNames.add(dimensions.get(i).getOutputName());
|
||||
needsReverseList.add(false);
|
||||
final ValueType type = dimensions.get(i).getOutputType();
|
||||
isNumericField.add(ValueType.isNumeric(type));
|
||||
dimensionTypes.add(type);
|
||||
comparators.add(StringComparators.LEXICOGRAPHIC);
|
||||
}
|
||||
}
|
||||
|
@ -418,7 +418,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return compareDimsForLimitPushDown(
|
||||
orderedFieldNames,
|
||||
needsReverseList,
|
||||
isNumericField,
|
||||
dimensionTypes,
|
||||
comparators,
|
||||
lhs,
|
||||
rhs
|
||||
|
@ -436,7 +436,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
final int cmp = compareDimsForLimitPushDown(
|
||||
orderedFieldNames,
|
||||
needsReverseList,
|
||||
isNumericField,
|
||||
dimensionTypes,
|
||||
comparators,
|
||||
lhs,
|
||||
rhs
|
||||
|
@ -465,7 +465,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return compareDimsForLimitPushDown(
|
||||
orderedFieldNames,
|
||||
needsReverseList,
|
||||
isNumericField,
|
||||
dimensionTypes,
|
||||
comparators,
|
||||
lhs,
|
||||
rhs
|
||||
|
@ -532,28 +532,12 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private static int compareDims(List<DimensionSpec> dimensions, Row lhs, Row rhs)
|
||||
{
|
||||
for (DimensionSpec dimension : dimensions) {
|
||||
final int dimCompare;
|
||||
if (dimension.getOutputType() == ValueType.LONG) {
|
||||
dimCompare = Comparators.<Long>naturalNullsFirst().compare(
|
||||
DimensionHandlerUtils.convertObjectToLong(lhs.getRaw(dimension.getOutputName())),
|
||||
DimensionHandlerUtils.convertObjectToLong(rhs.getRaw(dimension.getOutputName()))
|
||||
);
|
||||
} else if (dimension.getOutputType() == ValueType.FLOAT) {
|
||||
dimCompare = Comparators.<Float>naturalNullsFirst().compare(
|
||||
DimensionHandlerUtils.convertObjectToFloat(lhs.getRaw(dimension.getOutputName())),
|
||||
DimensionHandlerUtils.convertObjectToFloat(rhs.getRaw(dimension.getOutputName()))
|
||||
);
|
||||
} else if (dimension.getOutputType() == ValueType.DOUBLE) {
|
||||
dimCompare = Comparators.<Double>naturalNullsFirst().compare(
|
||||
DimensionHandlerUtils.convertObjectToDouble(lhs.getRaw(dimension.getOutputName())),
|
||||
DimensionHandlerUtils.convertObjectToDouble(rhs.getRaw(dimension.getOutputName()))
|
||||
);
|
||||
} else {
|
||||
dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
|
||||
lhs.getRaw(dimension.getOutputName()),
|
||||
rhs.getRaw(dimension.getOutputName())
|
||||
);
|
||||
}
|
||||
//noinspection unchecked
|
||||
final int dimCompare = DimensionHandlerUtils.compareObjectsAsType(
|
||||
lhs.getRaw(dimension.getOutputName()),
|
||||
rhs.getRaw(dimension.getOutputName()),
|
||||
dimension.getOutputType()
|
||||
);
|
||||
if (dimCompare != 0) {
|
||||
return dimCompare;
|
||||
}
|
||||
|
@ -565,7 +549,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
private static int compareDimsForLimitPushDown(
|
||||
final List<String> fields,
|
||||
final List<Boolean> needsReverseList,
|
||||
final List<Boolean> isNumericField,
|
||||
final List<ValueType> dimensionTypes,
|
||||
final List<StringComparator> comparators,
|
||||
Row lhs,
|
||||
Row rhs
|
||||
|
@ -574,17 +558,15 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
for (int i = 0; i < fields.size(); i++) {
|
||||
final String fieldName = fields.get(i);
|
||||
final StringComparator comparator = comparators.get(i);
|
||||
final ValueType dimensionType = dimensionTypes.get(i);
|
||||
|
||||
final int dimCompare;
|
||||
final Object lhsObj = lhs.getRaw(fieldName);
|
||||
final Object rhsObj = rhs.getRaw(fieldName);
|
||||
|
||||
if (isNumericField.get(i)) {
|
||||
if (ValueType.isNumeric(dimensionType)) {
|
||||
if (comparator.equals(StringComparators.NUMERIC)) {
|
||||
dimCompare = ((Ordering) Comparators.naturalNullsFirst()).compare(
|
||||
lhsObj,
|
||||
rhsObj
|
||||
);
|
||||
dimCompare = DimensionHandlerUtils.compareObjectsAsType(lhsObj, rhsObj, dimensionType);
|
||||
} else {
|
||||
dimCompare = comparator.compare(String.valueOf(lhsObj), String.valueOf(rhsObj));
|
||||
}
|
||||
|
|
|
@ -58,6 +58,7 @@ import io.druid.query.extraction.ExtractionFn;
|
|||
import io.druid.query.groupby.resource.GroupByQueryResource;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategy;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategySelector;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -457,8 +458,13 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
|
|||
Map<String, Object> event = Maps.newLinkedHashMap();
|
||||
Iterator<DimensionSpec> dimsIter = dims.iterator();
|
||||
while (dimsIter.hasNext() && results.hasNext()) {
|
||||
final DimensionSpec factory = dimsIter.next();
|
||||
event.put(factory.getOutputName(), results.next());
|
||||
final DimensionSpec dimensionSpec = dimsIter.next();
|
||||
|
||||
// Must convert generic Jackson-deserialized type into the proper type.
|
||||
event.put(
|
||||
dimensionSpec.getOutputName(),
|
||||
DimensionHandlerUtils.convertObjectToType(results.next(), dimensionSpec.getOutputType())
|
||||
);
|
||||
}
|
||||
|
||||
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
|
||||
|
|
|
@ -692,25 +692,7 @@ public class GroupByQueryEngineV2
|
|||
final ValueType outputType = dimSpec.getOutputType();
|
||||
rowMap.compute(
|
||||
dimSpec.getOutputName(),
|
||||
(dimName, baseVal) -> {
|
||||
switch (outputType) {
|
||||
case STRING:
|
||||
baseVal = DimensionHandlerUtils.convertObjectToString(baseVal);
|
||||
break;
|
||||
case LONG:
|
||||
baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal);
|
||||
break;
|
||||
case FLOAT:
|
||||
baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal);
|
||||
break;
|
||||
case DOUBLE:
|
||||
baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal);
|
||||
break;
|
||||
default:
|
||||
throw new IAE("Unsupported type: " + outputType);
|
||||
}
|
||||
return baseVal;
|
||||
}
|
||||
(dimName, baseVal) -> DimensionHandlerUtils.convertObjectToType(baseVal, outputType)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -593,29 +593,10 @@ public class RowBasedGrouperHelper
|
|||
{
|
||||
final Function<Comparable, Comparable>[] functions = new Function[valueTypes.size()];
|
||||
for (int i = 0; i < functions.length; i++) {
|
||||
ValueType type = valueTypes.get(i);
|
||||
// Subquery post-aggs aren't added to the rowSignature (see rowSignatureFor() in GroupByQueryHelper) because
|
||||
// their types aren't known, so default to String handling.
|
||||
type = type == null ? ValueType.STRING : type;
|
||||
switch (type) {
|
||||
case STRING:
|
||||
functions[i] = input -> DimensionHandlerUtils.convertObjectToString(input);
|
||||
break;
|
||||
|
||||
case LONG:
|
||||
functions[i] = input -> DimensionHandlerUtils.convertObjectToLong(input);
|
||||
break;
|
||||
|
||||
case FLOAT:
|
||||
functions[i] = input -> DimensionHandlerUtils.convertObjectToFloat(input);
|
||||
break;
|
||||
|
||||
case DOUBLE:
|
||||
functions[i] = input -> DimensionHandlerUtils.convertObjectToDouble(input);
|
||||
break;
|
||||
default:
|
||||
throw new IAE("invalid type: [%s]", type);
|
||||
}
|
||||
final ValueType type = valueTypes.get(i) == null ? ValueType.STRING : valueTypes.get(i);
|
||||
functions[i] = input -> DimensionHandlerUtils.convertObjectToType(input, type);
|
||||
}
|
||||
return functions;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.topn.types.TopNColumnSelectorStrategy;
|
||||
|
@ -110,14 +109,8 @@ public class DimExtractionTopNAlgorithm
|
|||
)
|
||||
{
|
||||
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
|
||||
final boolean needsResultTypeConversion = needsResultTypeConversion(params);
|
||||
final Function<Object, Object> valueTransformer = TopNMapFn.getValueTransformer(
|
||||
query.getDimensionSpec().getOutputType()
|
||||
);
|
||||
|
||||
selectorPlus.getColumnSelectorStrategy().updateDimExtractionResults(
|
||||
aggregatesStore,
|
||||
needsResultTypeConversion ? valueTransformer : null,
|
||||
resultBuilder
|
||||
);
|
||||
}
|
||||
|
@ -136,11 +129,4 @@ public class DimExtractionTopNAlgorithm
|
|||
public void cleanup(TopNParams params)
|
||||
{
|
||||
}
|
||||
|
||||
private boolean needsResultTypeConversion(TopNParams params)
|
||||
{
|
||||
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
|
||||
TopNColumnSelectorStrategy strategy = selectorPlus.getColumnSelectorStrategy();
|
||||
return query.getDimensionSpec().getOutputType() != strategy.getValueType();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,19 +26,19 @@ import java.util.Map;
|
|||
public class DimValHolder
|
||||
{
|
||||
private final Object topNMetricVal;
|
||||
private final Comparable dimName;
|
||||
private final Comparable dimValue;
|
||||
private final Object dimValIndex;
|
||||
private final Map<String, Object> metricValues;
|
||||
|
||||
public DimValHolder(
|
||||
Object topNMetricVal,
|
||||
Comparable dimName,
|
||||
Comparable dimValue,
|
||||
Object dimValIndex,
|
||||
Map<String, Object> metricValues
|
||||
)
|
||||
{
|
||||
this.topNMetricVal = topNMetricVal;
|
||||
this.dimName = dimName;
|
||||
this.dimValue = dimValue;
|
||||
this.dimValIndex = dimValIndex;
|
||||
this.metricValues = metricValues;
|
||||
}
|
||||
|
@ -48,9 +48,9 @@ public class DimValHolder
|
|||
return topNMetricVal;
|
||||
}
|
||||
|
||||
public Comparable getDimName()
|
||||
public Comparable getDimValue()
|
||||
{
|
||||
return dimName;
|
||||
return dimValue;
|
||||
}
|
||||
|
||||
public Object getDimValIndex()
|
||||
|
@ -66,14 +66,14 @@ public class DimValHolder
|
|||
public static class Builder
|
||||
{
|
||||
private Object topNMetricVal;
|
||||
private Comparable dimName;
|
||||
private Comparable dimValue;
|
||||
private Object dimValIndex;
|
||||
private Map<String, Object> metricValues;
|
||||
|
||||
public Builder()
|
||||
{
|
||||
topNMetricVal = null;
|
||||
dimName = null;
|
||||
dimValue = null;
|
||||
dimValIndex = null;
|
||||
metricValues = null;
|
||||
}
|
||||
|
@ -84,9 +84,9 @@ public class DimValHolder
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder withDimName(Comparable dimName)
|
||||
public Builder withDimValue(Comparable dimValue)
|
||||
{
|
||||
this.dimName = dimName;
|
||||
this.dimValue = dimValue;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -104,7 +104,7 @@ public class DimValHolder
|
|||
|
||||
public DimValHolder build()
|
||||
{
|
||||
return new DimValHolder(topNMetricVal, dimName, dimValIndex, metricValues);
|
||||
return new DimValHolder(topNMetricVal, dimValue, dimValIndex, metricValues);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.collections.NonBlockingPool;
|
||||
import io.druid.collections.ResourceHolder;
|
||||
|
@ -37,7 +36,6 @@ import io.druid.segment.Cursor;
|
|||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FilteredOffset;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.Offset;
|
||||
import io.druid.segment.historical.HistoricalColumnSelector;
|
||||
|
@ -736,10 +734,6 @@ public class PooledTopNAlgorithm
|
|||
final int[] aggregatorSizes = params.getAggregatorSizes();
|
||||
final DimensionSelector dimSelector = params.getDimSelector();
|
||||
|
||||
final ValueType outType = query.getDimensionSpec().getOutputType();
|
||||
final boolean needsResultConversion = outType != ValueType.STRING;
|
||||
final Function<Object, Object> valueTransformer = TopNMapFn.getValueTransformer(outType);
|
||||
|
||||
for (int i = 0; i < positions.length; i++) {
|
||||
int position = positions[i];
|
||||
if (position >= 0) {
|
||||
|
@ -749,14 +743,9 @@ public class PooledTopNAlgorithm
|
|||
position += aggregatorSizes[j];
|
||||
}
|
||||
|
||||
Object retVal = dimSelector.lookupName(i);
|
||||
if (needsResultConversion) {
|
||||
retVal = valueTransformer.apply(retVal);
|
||||
}
|
||||
|
||||
|
||||
// Output type must be STRING in order for PooledTopNAlgorithm to make sense; so no need to convert value.
|
||||
resultBuilder.addEntry(
|
||||
(Comparable) retVal,
|
||||
dimSelector.lookupName(i),
|
||||
i,
|
||||
vals
|
||||
);
|
||||
|
@ -854,18 +843,6 @@ public class PooledTopNAlgorithm
|
|||
private int numValuesPerPass;
|
||||
private TopNMetricSpecBuilder<int[]> arrayProvider;
|
||||
|
||||
public Builder()
|
||||
{
|
||||
selectorPlus = null;
|
||||
cursor = null;
|
||||
resultsBufHolder = null;
|
||||
resultsBuf = null;
|
||||
aggregatorSizes = null;
|
||||
numBytesPerRecord = 0;
|
||||
numValuesPerPass = 0;
|
||||
arrayProvider = null;
|
||||
}
|
||||
|
||||
public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus)
|
||||
{
|
||||
this.selectorPlus = selectorPlus;
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.query.Result;
|
||||
|
@ -66,32 +65,22 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
|
|||
this.threshold = threshold;
|
||||
this.pQueue = new PriorityQueue<>(
|
||||
threshold + 1,
|
||||
new Comparator<DimValHolder>()
|
||||
{
|
||||
@Override
|
||||
public int compare(
|
||||
DimValHolder o1,
|
||||
DimValHolder o2
|
||||
)
|
||||
{
|
||||
return comparator.compare(o2.getDimName(), o1.getDimName());
|
||||
}
|
||||
}
|
||||
(o1, o2) -> comparator.compare(o2.getDimValue(), o1.getDimValue())
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopNResultBuilder addEntry(
|
||||
Comparable dimNameObj,
|
||||
Comparable dimValueObj,
|
||||
Object dimValIndex,
|
||||
Object[] metricVals
|
||||
)
|
||||
{
|
||||
final String dimName = Objects.toString(dimNameObj, null);
|
||||
final String dimValue = Objects.toString(dimValueObj, null);
|
||||
final Map<String, Object> metricValues = Maps.newHashMapWithExpectedSize(metricVals.length + 1);
|
||||
|
||||
if (shouldAdd(dimName)) {
|
||||
metricValues.put(dimSpec.getOutputName(), dimName);
|
||||
if (shouldAdd(dimValue)) {
|
||||
metricValues.put(dimSpec.getOutputName(), dimValueObj);
|
||||
final int extra = metricVals.length % LOOP_UNROLL_COUNT;
|
||||
switch (extra) {
|
||||
case 7:
|
||||
|
@ -126,7 +115,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
|
|||
metricValues.put(aggFactoryNames[i + 7], metricVals[i + 7]);
|
||||
}
|
||||
|
||||
pQueue.add(new DimValHolder.Builder().withDimName(dimName).withMetricValues(metricValues).build());
|
||||
pQueue.add(new DimValHolder.Builder().withDimValue(dimValue).withMetricValues(metricValues).build());
|
||||
if (pQueue.size() > threshold) {
|
||||
pQueue.poll();
|
||||
}
|
||||
|
@ -143,7 +132,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
|
|||
|
||||
if (shouldAdd(dimensionValue)) {
|
||||
pQueue.add(
|
||||
new DimValHolder.Builder().withDimName(dimensionValue)
|
||||
new DimValHolder.Builder().withDimValue(dimensionValue)
|
||||
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
|
||||
.build()
|
||||
);
|
||||
|
@ -167,30 +156,11 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
|
|||
final DimValHolder[] holderValueArray = pQueue.toArray(new DimValHolder[0]);
|
||||
Arrays.sort(
|
||||
holderValueArray,
|
||||
new Comparator<DimValHolder>()
|
||||
{
|
||||
@Override
|
||||
public int compare(DimValHolder o1, DimValHolder o2)
|
||||
{
|
||||
return comparator.compare(o1.getDimName(), o2.getDimName());
|
||||
}
|
||||
}
|
||||
|
||||
(o1, o2) -> comparator.compare(o1.getDimValue(), o2.getDimValue())
|
||||
);
|
||||
return new Result(
|
||||
timestamp, new TopNResultValue(
|
||||
Lists.transform(
|
||||
Arrays.asList(holderValueArray),
|
||||
new Function<DimValHolder, Object>()
|
||||
{
|
||||
@Override
|
||||
public Object apply(DimValHolder dimValHolder)
|
||||
{
|
||||
return dimValHolder.getMetricValues();
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
return new Result<>(
|
||||
timestamp,
|
||||
new TopNResultValue(Lists.transform(Arrays.asList(holderValueArray), DimValHolder::getMetricValues))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,46 +19,16 @@
|
|||
|
||||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.topn.types.TopNColumnSelectorStrategyFactory;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Objects;
|
||||
|
||||
public class TopNMapFn
|
||||
{
|
||||
public static Function<Object, Object> getValueTransformer(ValueType outputType)
|
||||
{
|
||||
switch (outputType) {
|
||||
case STRING:
|
||||
return STRING_TRANSFORMER;
|
||||
case LONG:
|
||||
return LONG_TRANSFORMER;
|
||||
case FLOAT:
|
||||
return FLOAT_TRANSFORMER;
|
||||
case DOUBLE:
|
||||
return DOUBLE_TRANSFORMER;
|
||||
default:
|
||||
throw new IAE("invalid type: %s", outputType);
|
||||
}
|
||||
}
|
||||
|
||||
private static Function<Object, Object> STRING_TRANSFORMER = Objects::toString;
|
||||
|
||||
private static Function<Object, Object> LONG_TRANSFORMER = DimensionHandlerUtils::convertObjectToLong;
|
||||
|
||||
private static Function<Object, Object> FLOAT_TRANSFORMER = DimensionHandlerUtils::convertObjectToFloat;
|
||||
|
||||
private static Function<Object, Object> DOUBLE_TRANSFORMER = DimensionHandlerUtils::convertObjectToDouble;
|
||||
|
||||
private static final TopNColumnSelectorStrategyFactory STRATEGY_FACTORY = new TopNColumnSelectorStrategyFactory();
|
||||
|
||||
private final TopNQuery query;
|
||||
private final TopNAlgorithm topNAlgorithm;
|
||||
|
||||
|
@ -75,7 +45,7 @@ public class TopNMapFn
|
|||
public Result<TopNResultValue> apply(final Cursor cursor, final @Nullable TopNQueryMetrics queryMetrics)
|
||||
{
|
||||
final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
|
||||
STRATEGY_FACTORY,
|
||||
new TopNColumnSelectorStrategyFactory(query.getDimensionSpec().getOutputType()),
|
||||
query.getDimensionSpec(),
|
||||
cursor.getColumnSelectorFactory()
|
||||
);
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
@ -48,9 +47,9 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
private final String metricName;
|
||||
private final List<PostAggregator> postAggs;
|
||||
private final PriorityQueue<DimValHolder> pQueue;
|
||||
private final Comparator<DimValHolder> dimValComparator;
|
||||
private final Comparator<DimValHolder> dimValHolderComparator;
|
||||
private final String[] aggFactoryNames;
|
||||
private static final Comparator<Comparable> dimNameComparator = new Comparator<Comparable>()
|
||||
private static final Comparator<Comparable> dimValueComparator = new Comparator<Comparable>()
|
||||
{
|
||||
@Override
|
||||
public int compare(Comparable o1, Comparable o2)
|
||||
|
@ -65,6 +64,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
} else if (null == o2) {
|
||||
retval = 1;
|
||||
} else {
|
||||
//noinspection unchecked
|
||||
retval = o1.compareTo(o2);
|
||||
}
|
||||
return retval;
|
||||
|
@ -91,30 +91,26 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
this.postAggs = AggregatorUtil.pruneDependentPostAgg(postAggs, this.metricName);
|
||||
this.threshold = threshold;
|
||||
this.metricComparator = comparator;
|
||||
this.dimValComparator = new Comparator<DimValHolder>()
|
||||
{
|
||||
@Override
|
||||
public int compare(DimValHolder d1, DimValHolder d2)
|
||||
{
|
||||
int retVal = metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
|
||||
this.dimValHolderComparator = (d1, d2) -> {
|
||||
//noinspection unchecked
|
||||
int retVal = metricComparator.compare(d1.getTopNMetricVal(), d2.getTopNMetricVal());
|
||||
|
||||
if (retVal == 0) {
|
||||
retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
if (retVal == 0) {
|
||||
retVal = dimValueComparator.compare(d1.getDimValue(), d2.getDimValue());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
};
|
||||
|
||||
// The logic in addEntry first adds, then removes if needed. So it can at any point have up to threshold + 1 entries.
|
||||
pQueue = new PriorityQueue<>(this.threshold + 1, this.dimValComparator);
|
||||
pQueue = new PriorityQueue<>(this.threshold + 1, dimValHolderComparator);
|
||||
}
|
||||
|
||||
private static final int LOOP_UNROLL_COUNT = 8;
|
||||
|
||||
@Override
|
||||
public TopNNumericResultBuilder addEntry(
|
||||
Comparable dimName,
|
||||
Comparable dimValueObj,
|
||||
Object dimValIndex,
|
||||
Object[] metricVals
|
||||
)
|
||||
|
@ -126,7 +122,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
|
||||
final Map<String, Object> metricValues = Maps.newHashMapWithExpectedSize(metricVals.length + postAggs.size() + 1);
|
||||
|
||||
metricValues.put(dimSpec.getOutputName(), dimName);
|
||||
metricValues.put(dimSpec.getOutputName(), dimValueObj);
|
||||
|
||||
final int extra = metricVals.length % LOOP_UNROLL_COUNT;
|
||||
|
||||
|
@ -173,7 +169,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
if (shouldAdd(topNMetricVal)) {
|
||||
DimValHolder dimValHolder = new DimValHolder.Builder()
|
||||
.withTopNMetricVal(topNMetricVal)
|
||||
.withDimName(dimName)
|
||||
.withDimValue(dimValueObj)
|
||||
.withDimValIndex(dimValIndex)
|
||||
.withMetricValues(metricValues)
|
||||
.build();
|
||||
|
@ -202,7 +198,7 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
if (shouldAdd(dimValue)) {
|
||||
final DimValHolder valHolder = new DimValHolder.Builder()
|
||||
.withTopNMetricVal(dimValue)
|
||||
.withDimName((Comparable) dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName()))
|
||||
.withDimValue((Comparable) dimensionAndMetricValueExtractor.getDimensionValue(dimSpec.getOutputName()))
|
||||
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
|
||||
.build();
|
||||
pQueue.add(valHolder);
|
||||
|
@ -224,39 +220,24 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
|
|||
{
|
||||
final DimValHolder[] holderValueArray = pQueue.toArray(new DimValHolder[0]);
|
||||
Arrays.sort(
|
||||
holderValueArray, new Comparator<DimValHolder>()
|
||||
{
|
||||
@Override
|
||||
public int compare(DimValHolder d1, DimValHolder d2)
|
||||
{
|
||||
// Values flipped compared to earlier
|
||||
int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal());
|
||||
holderValueArray,
|
||||
(d1, d2) -> {
|
||||
// Metric values flipped compared to dimValueHolderComparator.
|
||||
|
||||
if (retVal == 0) {
|
||||
retVal = dimNameComparator.compare(d1.getDimName(), d2.getDimName());
|
||||
}
|
||||
//noinspection unchecked
|
||||
int retVal = metricComparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal());
|
||||
|
||||
return retVal;
|
||||
if (retVal == 0) {
|
||||
retVal = dimValueComparator.compare(d1.getDimValue(), d2.getDimValue());
|
||||
}
|
||||
|
||||
return retVal;
|
||||
}
|
||||
);
|
||||
List<DimValHolder> holderValues = Arrays.asList(holderValueArray);
|
||||
|
||||
// Pull out top aggregated values
|
||||
final List<Map<String, Object>> values = Lists.transform(
|
||||
holderValues,
|
||||
new Function<DimValHolder, Map<String, Object>>()
|
||||
{
|
||||
@Override
|
||||
public Map<String, Object> apply(DimValHolder valHolder)
|
||||
{
|
||||
return valHolder.getMetricValues();
|
||||
}
|
||||
}
|
||||
);
|
||||
return new Result<TopNResultValue>(
|
||||
timestamp,
|
||||
new TopNResultValue(values)
|
||||
);
|
||||
final List<Map<String, Object>> values = Lists.transform(holderValues, DimValHolder::getMetricValues);
|
||||
return new Result<>(timestamp, new TopNResultValue(values));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -142,6 +142,10 @@ public class TopNQueryEngine
|
|||
&& columnCapabilities.isDictionaryEncoded())) {
|
||||
// Use DimExtraction for non-Strings and for non-dictionary-encoded Strings.
|
||||
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
|
||||
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
|
||||
// Use DimExtraction when the dimension output type is a non-String. (It's like an extractionFn: there can be
|
||||
// a many-to-one mapping, since numeric types can't represent all possible values of other types.)
|
||||
topNAlgorithm = new DimExtractionTopNAlgorithm(adapter, query);
|
||||
} else if (selector.isAggregateAllMetrics()) {
|
||||
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
|
||||
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
|
||||
|
|
|
@ -50,6 +50,7 @@ import io.druid.query.aggregation.PostAggregator;
|
|||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
@ -388,11 +389,6 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
Iterator<Object> inputIter = results.iterator();
|
||||
DateTime timestamp = granularity.toDateTime(((Number) inputIter.next()).longValue());
|
||||
|
||||
// Need a value transformer to convert generic Jackson-deserialized type into the proper type.
|
||||
final Function<Object, Object> dimValueTransformer = TopNMapFn.getValueTransformer(
|
||||
query.getDimensionSpec().getOutputType()
|
||||
);
|
||||
|
||||
while (inputIter.hasNext()) {
|
||||
List<Object> result = (List<Object>) inputIter.next();
|
||||
Map<String, Object> vals = Maps.newLinkedHashMap();
|
||||
|
@ -400,7 +396,11 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
|
|||
Iterator<AggregatorFactory> aggIter = aggs.iterator();
|
||||
Iterator<Object> resultIter = result.iterator();
|
||||
|
||||
vals.put(query.getDimensionSpec().getOutputName(), dimValueTransformer.apply(resultIter.next()));
|
||||
// Must convert generic Jackson-deserialized type into the proper type.
|
||||
vals.put(
|
||||
query.getDimensionSpec().getOutputName(),
|
||||
DimensionHandlerUtils.convertObjectToType(resultIter.next(), query.getDimensionSpec().getOutputType())
|
||||
);
|
||||
|
||||
while (aggIter.hasNext() && resultIter.hasNext()) {
|
||||
final AggregatorFactory factory = aggIter.next();
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.Iterator;
|
|||
public interface TopNResultBuilder
|
||||
{
|
||||
TopNResultBuilder addEntry(
|
||||
Comparable dimNameObj,
|
||||
Comparable dimValueObj,
|
||||
Object dimValIndex,
|
||||
Object[] metricVals
|
||||
);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package io.druid.query.topn.types;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.topn.BaseTopNAlgorithm;
|
||||
import io.druid.query.topn.TopNParams;
|
||||
|
@ -29,6 +29,7 @@ import io.druid.segment.BaseDoubleColumnValueSelector;
|
|||
import io.druid.segment.BaseFloatColumnValueSelector;
|
||||
import io.druid.segment.BaseLongColumnValueSelector;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
|
||||
|
@ -37,12 +38,32 @@ import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
|
|||
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
public abstract class NumericTopNColumnSelectorStrategy<
|
||||
ValueSelectorType,
|
||||
DimExtractionAggregateStoreType extends Map<?, Aggregator[]>>
|
||||
implements TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType>
|
||||
{
|
||||
public static TopNColumnSelectorStrategy ofType(final ValueType selectorType, final ValueType dimensionType)
|
||||
{
|
||||
final Function<Object, Comparable<?>> converter = DimensionHandlerUtils.converterFromTypeToType(
|
||||
selectorType,
|
||||
dimensionType
|
||||
);
|
||||
|
||||
switch (selectorType) {
|
||||
case LONG:
|
||||
return new OfLong(converter);
|
||||
case FLOAT:
|
||||
return new OfFloat(converter);
|
||||
case DOUBLE:
|
||||
return new OfDouble(converter);
|
||||
default:
|
||||
throw new IAE("No strategy for type[%s]", selectorType);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCardinality(ValueSelectorType selector)
|
||||
{
|
||||
|
@ -132,7 +153,6 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
@Override
|
||||
public void updateDimExtractionResults(
|
||||
final DimExtractionAggregateStoreType aggregatesStore,
|
||||
final Function<Object, Object> valueTransformer,
|
||||
final TopNResultBuilder resultBuilder
|
||||
)
|
||||
{
|
||||
|
@ -144,11 +164,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
vals[i] = aggs[i].get();
|
||||
}
|
||||
|
||||
Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
|
||||
if (valueTransformer != null) {
|
||||
key = (Comparable) valueTransformer.apply(key);
|
||||
}
|
||||
|
||||
final Comparable key = convertAggregatorStoreKeyToColumnValue(entry.getKey());
|
||||
resultBuilder.addEntry(key, key, vals);
|
||||
}
|
||||
}
|
||||
|
@ -159,10 +175,11 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
static class OfFloat
|
||||
extends NumericTopNColumnSelectorStrategy<BaseFloatColumnValueSelector, Int2ObjectMap<Aggregator[]>>
|
||||
{
|
||||
@Override
|
||||
public ValueType getValueType()
|
||||
private final Function<Object, Comparable<?>> converter;
|
||||
|
||||
OfFloat(final Function<Object, Comparable<?>> converter)
|
||||
{
|
||||
return ValueType.FLOAT;
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -174,7 +191,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
@Override
|
||||
Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
|
||||
{
|
||||
return Float.intBitsToFloat((Integer) aggregatorStoreKey);
|
||||
return converter.apply(Float.intBitsToFloat((Integer) aggregatorStoreKey));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -193,10 +210,11 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
static class OfLong
|
||||
extends NumericTopNColumnSelectorStrategy<BaseLongColumnValueSelector, Long2ObjectMap<Aggregator[]>>
|
||||
{
|
||||
@Override
|
||||
public ValueType getValueType()
|
||||
private final Function<Object, Comparable<?>> converter;
|
||||
|
||||
OfLong(final Function<Object, Comparable<?>> converter)
|
||||
{
|
||||
return ValueType.LONG;
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -208,7 +226,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
@Override
|
||||
Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
|
||||
{
|
||||
return (Long) aggregatorStoreKey;
|
||||
return converter.apply(aggregatorStoreKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -227,10 +245,11 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
static class OfDouble
|
||||
extends NumericTopNColumnSelectorStrategy<BaseDoubleColumnValueSelector, Long2ObjectMap<Aggregator[]>>
|
||||
{
|
||||
@Override
|
||||
public ValueType getValueType()
|
||||
private final Function<Object, Comparable<?>> converter;
|
||||
|
||||
OfDouble(final Function<Object, Comparable<?>> converter)
|
||||
{
|
||||
return ValueType.DOUBLE;
|
||||
this.converter = converter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -242,7 +261,7 @@ public abstract class NumericTopNColumnSelectorStrategy<
|
|||
@Override
|
||||
Comparable convertAggregatorStoreKeyToColumnValue(Object aggregatorStoreKey)
|
||||
{
|
||||
return Double.longBitsToDouble((Long) aggregatorStoreKey);
|
||||
return converter.apply(Double.longBitsToDouble((Long) aggregatorStoreKey));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,36 +19,38 @@
|
|||
|
||||
package io.druid.query.topn.types;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.topn.BaseTopNAlgorithm;
|
||||
import io.druid.query.topn.TopNParams;
|
||||
import io.druid.query.topn.TopNQuery;
|
||||
import io.druid.query.topn.TopNResultBuilder;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class StringTopNColumnSelectorStrategy
|
||||
implements TopNColumnSelectorStrategy<DimensionSelector, Map<String, Aggregator[]>>
|
||||
implements TopNColumnSelectorStrategy<DimensionSelector, Map<Comparable, Aggregator[]>>
|
||||
{
|
||||
private final Function<Object, Comparable<?>> dimensionValueConverter;
|
||||
|
||||
public StringTopNColumnSelectorStrategy(final ValueType dimensionType)
|
||||
{
|
||||
this.dimensionValueConverter = DimensionHandlerUtils.converterFromTypeToType(ValueType.STRING, dimensionType);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCardinality(DimensionSelector selector)
|
||||
{
|
||||
return selector.getValueCardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueType getValueType()
|
||||
{
|
||||
return ValueType.STRING;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, StorageAdapter storageAdapter)
|
||||
{
|
||||
|
@ -71,9 +73,9 @@ public class StringTopNColumnSelectorStrategy
|
|||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Aggregator[]> makeDimExtractionAggregateStore()
|
||||
public Map<Comparable, Aggregator[]> makeDimExtractionAggregateStore()
|
||||
{
|
||||
return Maps.newHashMap();
|
||||
return new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -82,7 +84,7 @@ public class StringTopNColumnSelectorStrategy
|
|||
DimensionSelector selector,
|
||||
Cursor cursor,
|
||||
Aggregator[][] rowSelector,
|
||||
Map<String, Aggregator[]> aggregatesStore
|
||||
Map<Comparable, Aggregator[]> aggregatesStore
|
||||
)
|
||||
{
|
||||
if (selector.getValueCardinality() != DimensionSelector.CARDINALITY_UNKNOWN) {
|
||||
|
@ -94,12 +96,11 @@ public class StringTopNColumnSelectorStrategy
|
|||
|
||||
@Override
|
||||
public void updateDimExtractionResults(
|
||||
final Map<String, Aggregator[]> aggregatesStore,
|
||||
final Function<Object, Object> valueTransformer,
|
||||
final Map<Comparable, Aggregator[]> aggregatesStore,
|
||||
final TopNResultBuilder resultBuilder
|
||||
)
|
||||
{
|
||||
for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
|
||||
for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
|
||||
Aggregator[] aggs = entry.getValue();
|
||||
if (aggs != null) {
|
||||
Object[] vals = new Object[aggs.length];
|
||||
|
@ -107,16 +108,8 @@ public class StringTopNColumnSelectorStrategy
|
|||
vals[i] = aggs[i].get();
|
||||
}
|
||||
|
||||
Comparable key = entry.getKey();
|
||||
if (valueTransformer != null) {
|
||||
key = (Comparable) valueTransformer.apply(key);
|
||||
}
|
||||
|
||||
resultBuilder.addEntry(
|
||||
key,
|
||||
key,
|
||||
vals
|
||||
);
|
||||
final Comparable key = dimensionValueConverter.apply(entry.getKey());
|
||||
resultBuilder.addEntry(key, key, vals);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -126,7 +119,7 @@ public class StringTopNColumnSelectorStrategy
|
|||
Cursor cursor,
|
||||
DimensionSelector selector,
|
||||
Aggregator[][] rowSelector,
|
||||
Map<String, Aggregator[]> aggregatesStore
|
||||
Map<Comparable, Aggregator[]> aggregatesStore
|
||||
)
|
||||
{
|
||||
long processedRows = 0;
|
||||
|
@ -136,7 +129,7 @@ public class StringTopNColumnSelectorStrategy
|
|||
final int dimIndex = dimValues.get(i);
|
||||
Aggregator[] theAggregators = rowSelector[dimIndex];
|
||||
if (theAggregators == null) {
|
||||
final String key = selector.lookupName(dimIndex);
|
||||
final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
|
||||
theAggregators = aggregatesStore.get(key);
|
||||
if (theAggregators == null) {
|
||||
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
|
||||
|
@ -159,7 +152,7 @@ public class StringTopNColumnSelectorStrategy
|
|||
TopNQuery query,
|
||||
Cursor cursor,
|
||||
DimensionSelector selector,
|
||||
Map<String, Aggregator[]> aggregatesStore
|
||||
Map<Comparable, Aggregator[]> aggregatesStore
|
||||
)
|
||||
{
|
||||
long processedRows = 0;
|
||||
|
@ -167,7 +160,7 @@ public class StringTopNColumnSelectorStrategy
|
|||
final IndexedInts dimValues = selector.getRow();
|
||||
for (int i = 0, size = dimValues.size(); i < size; ++i) {
|
||||
final int dimIndex = dimValues.get(i);
|
||||
final String key = selector.lookupName(dimIndex);
|
||||
final Comparable<?> key = dimensionValueConverter.apply(selector.lookupName(dimIndex));
|
||||
|
||||
Aggregator[] theAggregators = aggregatesStore.get(key);
|
||||
if (theAggregators == null) {
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.query.topn.types;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.dimension.ColumnSelectorStrategy;
|
||||
import io.druid.query.topn.TopNParams;
|
||||
|
@ -27,9 +26,7 @@ import io.druid.query.topn.TopNQuery;
|
|||
import io.druid.query.topn.TopNResultBuilder;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Map;
|
||||
|
||||
public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggregateStoreType extends Map>
|
||||
|
@ -39,8 +36,6 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
|
|||
|
||||
int getCardinality(ValueSelectorType selector);
|
||||
|
||||
ValueType getValueType();
|
||||
|
||||
/**
|
||||
* Used by DimExtractionTopNAlgorithm.
|
||||
*
|
||||
|
@ -107,13 +102,11 @@ public interface TopNColumnSelectorStrategy<ValueSelectorType, DimExtractionAggr
|
|||
* Read entries from the aggregates store, adding the keys and associated values to the resultBuilder, applying the
|
||||
* valueTransformer to the keys if present
|
||||
*
|
||||
* @param aggregatesStore Map created by makeDimExtractionAggregateStore()
|
||||
* @param valueTransformer Converts keys to different types, if null no conversion is needed
|
||||
* @param resultBuilder TopN result builder
|
||||
* @param aggregatesStore Map created by makeDimExtractionAggregateStore()
|
||||
* @param resultBuilder TopN result builder
|
||||
*/
|
||||
void updateDimExtractionResults(
|
||||
DimExtractionAggregateStoreType aggregatesStore,
|
||||
@Nullable Function<Object, Object> valueTransformer,
|
||||
TopNResultBuilder resultBuilder
|
||||
);
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.query.topn.types;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
|
||||
import io.druid.segment.ColumnValueSelector;
|
||||
|
@ -27,23 +28,39 @@ import io.druid.segment.column.ValueType;
|
|||
|
||||
public class TopNColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
|
||||
{
|
||||
private final ValueType dimensionType;
|
||||
|
||||
public TopNColumnSelectorStrategyFactory(final ValueType dimensionType)
|
||||
{
|
||||
this.dimensionType = Preconditions.checkNotNull(dimensionType, "dimensionType");
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
|
||||
ColumnCapabilities capabilities, ColumnValueSelector selector
|
||||
)
|
||||
{
|
||||
ValueType type = capabilities.getType();
|
||||
switch (type) {
|
||||
final ValueType selectorType = capabilities.getType();
|
||||
|
||||
switch (selectorType) {
|
||||
case STRING:
|
||||
return new StringTopNColumnSelectorStrategy();
|
||||
// Return strategy that reads strings and outputs dimensionTypes.
|
||||
return new StringTopNColumnSelectorStrategy(dimensionType);
|
||||
case LONG:
|
||||
return new NumericTopNColumnSelectorStrategy.OfLong();
|
||||
case FLOAT:
|
||||
return new NumericTopNColumnSelectorStrategy.OfFloat();
|
||||
case DOUBLE:
|
||||
return new NumericTopNColumnSelectorStrategy.OfDouble();
|
||||
if (ValueType.isNumeric(dimensionType)) {
|
||||
// Return strategy that aggregates using the _output_ type, because this allows us to collapse values
|
||||
// properly (numeric types cannot represent all values of other numeric types).
|
||||
return NumericTopNColumnSelectorStrategy.ofType(dimensionType, dimensionType);
|
||||
} else {
|
||||
// Return strategy that aggregates using the _input_ type. Here we are assuming that the output type can
|
||||
// represent all possible values of the input type. This will be true for STRING, which is the only
|
||||
// non-numeric type currently supported.
|
||||
return NumericTopNColumnSelectorStrategy.ofType(selectorType, dimensionType);
|
||||
}
|
||||
default:
|
||||
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
|
||||
throw new IAE("Cannot create query type helper from invalid type [%s]", selectorType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,12 +19,14 @@
|
|||
|
||||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.primitives.Doubles;
|
||||
import com.google.common.primitives.Floats;
|
||||
import io.druid.common.guava.GuavaUtils;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.guava.Comparators;
|
||||
import io.druid.java.util.common.parsers.ParseException;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.dimension.ColumnSelectorStrategy;
|
||||
|
@ -38,6 +40,7 @@ import javax.annotation.Nullable;
|
|||
import java.math.BigDecimal;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
public final class DimensionHandlerUtils
|
||||
{
|
||||
|
@ -132,9 +135,10 @@ public final class DimensionHandlerUtils
|
|||
* in a query engine. See GroupByStrategyFactory for a reference.
|
||||
*
|
||||
* @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
|
||||
* @param columnSelectorFactory Used to create value selectors for columns.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
|
||||
* @param columnSelectorFactory Used to create value selectors for columns.
|
||||
*
|
||||
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
|
||||
*/
|
||||
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
|
||||
|
@ -302,6 +306,64 @@ public final class DimensionHandlerUtils
|
|||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static Comparable<?> convertObjectToType(
|
||||
@Nullable final Object obj,
|
||||
final ValueType type,
|
||||
final boolean reportParseExceptions
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(type, "type");
|
||||
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return convertObjectToLong(obj, reportParseExceptions);
|
||||
case FLOAT:
|
||||
return convertObjectToFloat(obj, reportParseExceptions);
|
||||
case DOUBLE:
|
||||
return convertObjectToDouble(obj, reportParseExceptions);
|
||||
case STRING:
|
||||
return convertObjectToString(obj);
|
||||
default:
|
||||
throw new IAE("Type[%s] is not supported for dimensions!", type);
|
||||
}
|
||||
}
|
||||
|
||||
public static int compareObjectsAsType(
|
||||
@Nullable final Object lhs,
|
||||
@Nullable final Object rhs,
|
||||
final ValueType type
|
||||
)
|
||||
{
|
||||
//noinspection unchecked
|
||||
return Comparators.<Comparable>naturalNullsFirst().compare(
|
||||
convertObjectToType(lhs, type),
|
||||
convertObjectToType(rhs, type)
|
||||
);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static Comparable<?> convertObjectToType(
|
||||
@Nullable final Object obj,
|
||||
final ValueType type
|
||||
)
|
||||
{
|
||||
return convertObjectToType(obj, Preconditions.checkNotNull(type, "type"), false);
|
||||
}
|
||||
|
||||
public static Function<Object, Comparable<?>> converterFromTypeToType(
|
||||
final ValueType fromType,
|
||||
final ValueType toType
|
||||
)
|
||||
{
|
||||
if (fromType == toType) {
|
||||
//noinspection unchecked
|
||||
return (Function) Function.identity();
|
||||
} else {
|
||||
return obj -> convertObjectToType(obj, toType);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public static Double convertObjectToDouble(@Nullable Object valObj)
|
||||
{
|
||||
|
|
|
@ -5133,6 +5133,138 @@ public class TopNQueryRunnerTest
|
|||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSortOnDoubleAsLong()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(new DefaultDimensionSpec("index", "index_alias", ValueType.LONG))
|
||||
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
|
||||
.threshold(4)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.build();
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 59L)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 67L)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 68L)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 69L)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSortOnTimeAsLong()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(new DefaultDimensionSpec("__time", "__time_alias", ValueType.LONG))
|
||||
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
|
||||
.threshold(4)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.build();
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time_alias", DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time_alias", DateTimes.of("2011-01-13T00:00:00.000Z").getMillis())
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time_alias", DateTimes.of("2011-01-14T00:00:00.000Z").getMillis())
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("__time_alias", DateTimes.of("2011-01-15T00:00:00.000Z").getMillis())
|
||||
.build()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSortOnStringAsDouble()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(new DefaultDimensionSpec("market", "alias", ValueType.DOUBLE))
|
||||
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
|
||||
.threshold(4)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.build();
|
||||
|
||||
final Map<String, Object> nullAliasMap = new HashMap<>();
|
||||
nullAliasMap.put("alias", null);
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(Collections.singletonList(nullAliasMap))
|
||||
)
|
||||
);
|
||||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSortOnDoubleAsDouble()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(new DefaultDimensionSpec("index", "index_alias", ValueType.DOUBLE))
|
||||
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
|
||||
.threshold(4)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.build();
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 59.021022d)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 59.266595d)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 67.73117d)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("index_alias", 68.573162d)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFullOnTopNLongTimeColumnWithExFn()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue