From 0e5bd8b4d421c24e036e4104752f0e40c6d6a66a Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Wed, 21 Dec 2016 19:11:37 -0800 Subject: [PATCH] Add dimension type-based interface for query processing (#3570) * Add dimension type-based interface for query processing * PR comment changes * Address PR comments * Use getters for QueryDimensionInfo * Split DimensionQueryHelper into base interface and query-specific interfaces * Treat empty rows as nulls in v2 groupby * Reduce boxing in SearchQueryRunner * Add GroupBy empty row handling to MultiValuedDimensionTest * Address PR comments * PR comments and refactoring * More PR comments * PR comments --- .../io/druid/query/ColumnSelectorPlus.java | 85 ++++ .../FilteredAggregatorFactory.java | 161 ++----- .../cardinality/CardinalityAggregator.java | 69 ++- .../CardinalityAggregatorFactory.java | 50 +-- .../CardinalityBufferAggregator.java | 13 +- ...alityAggregatorColumnSelectorStrategy.java | 46 ++ ...gregatorColumnSelectorStrategyFactory.java | 43 ++ ...alityAggregatorColumnSelectorStrategy.java | 76 ++++ .../dimension/ColumnSelectorStrategy.java | 27 ++ .../ColumnSelectorStrategyFactory.java | 27 ++ ...ingValueMatcherColumnSelectorStrategy.java | 160 +++++++ .../ValueMatcherColumnSelectorStrategy.java | 45 ++ ...eMatcherColumnSelectorStrategyFactory.java | 43 ++ .../query/filter/ValueMatcherFactory.java | 2 +- .../epinephelinae/GroupByQueryEngineV2.java | 331 +++++++++++--- .../druid/query/search/SearchQueryRunner.java | 419 ++++++++++++------ .../druid/query/select/SelectQueryEngine.java | 108 +++-- .../AggregateTopNMetricFirstAlgorithm.java | 10 +- .../druid/query/topn/BaseTopNAlgorithm.java | 8 +- .../topn/DimExtractionTopNAlgorithm.java | 66 +-- .../druid/query/topn/PooledTopNAlgorithm.java | 20 +- .../topn/TimeExtractionTopNAlgorithm.java | 5 +- .../io/druid/query/topn/TopNAlgorithm.java | 5 +- .../java/io/druid/query/topn/TopNMapFn.java | 18 +- .../java/io/druid/query/topn/TopNParams.java | 19 +- .../io/druid/query/topn/TopNQueryEngine.java | 1 - .../StringTopNColumnSelectorStrategy.java | 69 +++ .../types/TopNColumnSelectorStrategy.java | 82 ++++ .../query/topn/types/TopNStrategyFactory.java | 42 ++ .../io/druid/segment/ColumnValueSelector.java | 27 ++ .../io/druid/segment/DimensionHandler.java | 24 +- .../druid/segment/DimensionHandlerUtil.java | 51 --- .../druid/segment/DimensionHandlerUtils.java | 159 +++++++ .../io/druid/segment/DimensionIndexer.java | 37 +- .../io/druid/segment/DimensionMerger.java | 10 +- .../druid/segment/DimensionMergerLegacy.java | 10 +- .../io/druid/segment/DimensionMergerV9.java | 4 +- .../io/druid/segment/DimensionSelector.java | 2 +- .../io/druid/segment/FloatColumnSelector.java | 2 +- .../java/io/druid/segment/IndexMerger.java | 4 +- .../io/druid/segment/LongColumnSelector.java | 2 +- .../druid/segment/ObjectColumnSelector.java | 2 +- .../segment/QueryableIndexStorageAdapter.java | 125 ++---- .../druid/segment/SimpleQueryableIndex.java | 2 +- .../druid/segment/StringDimensionHandler.java | 17 - .../segment/StringDimensionMergerLegacy.java | 8 + .../segment/incremental/IncrementalIndex.java | 12 +- .../druid/query/MultiValuedDimensionTest.java | 2 + .../CardinalityAggregatorBenchmark.java | 16 +- .../CardinalityAggregatorTest.java | 114 ++++- .../query/groupby/GroupByQueryRunnerTest.java | 40 ++ .../query/search/SearchQueryRunnerTest.java | 29 ++ 52 files changed, 2016 insertions(+), 733 deletions(-) create mode 100644 processing/src/main/java/io/druid/query/ColumnSelectorPlus.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java create mode 100644 processing/src/main/java/io/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategyFactory.java create mode 100644 processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java create mode 100644 processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java create mode 100644 processing/src/main/java/io/druid/query/topn/types/TopNStrategyFactory.java create mode 100644 processing/src/main/java/io/druid/segment/ColumnValueSelector.java delete mode 100644 processing/src/main/java/io/druid/segment/DimensionHandlerUtil.java create mode 100644 processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java diff --git a/processing/src/main/java/io/druid/query/ColumnSelectorPlus.java b/processing/src/main/java/io/druid/query/ColumnSelectorPlus.java new file mode 100644 index 00000000000..6dd2872aa96 --- /dev/null +++ b/processing/src/main/java/io/druid/query/ColumnSelectorPlus.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query; + +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.segment.ColumnValueSelector; + +/** + * A grouping of various related objects used during query processing for a single dimension, used for convenience. + * + * Each ColumnSelectorPlus is associated with a single dimension. + */ +public class ColumnSelectorPlus +{ + /** + * Helper object that handles row value operations that pertain to a specific query type for this + * dimension within query processing engines. + */ + private final ColumnSelectorStrategyClass columnSelectorStrategy; + + /** + * Internal name of the dimension. + */ + private final String name; + + /** + * Name of the dimension to be returned in query results. + */ + private final String outputName; + + /** + * Column value selector for this dimension, e.g. a DimensionSelector for String dimensions. + */ + private final ColumnValueSelector selector; + + public ColumnSelectorPlus( + String columnName, + String outputName, + ColumnSelectorStrategyClass columnSelectorStrategy, + ColumnValueSelector selector + ) + { + this.columnSelectorStrategy = columnSelectorStrategy; + this.name = columnName; + this.outputName = outputName; + this.selector = selector; + } + + public ColumnSelectorStrategyClass getColumnSelectorStrategy() + { + return columnSelectorStrategy; + } + + public String getName() + { + return name; + } + + public String getOutputName() + { + return outputName; + } + + public ColumnValueSelector getSelector() + { + return selector; + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index 9e830f71df8..2ea3b27c0c1 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -21,28 +21,28 @@ package io.druid.query.aggregation; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.DimFilter; import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.ValueMatcher; +import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; +import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; import io.druid.query.filter.ValueMatcherFactory; import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ValueType; -import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; import java.nio.ByteBuffer; -import java.util.BitSet; import java.util.Comparator; import java.util.List; -import java.util.Objects; public class FilteredAggregatorFactory extends AggregatorFactory { @@ -211,6 +211,9 @@ public class FilteredAggregatorFactory extends AggregatorFactory private static class FilteredAggregatorValueMatcherFactory implements ValueMatcherFactory { + private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY = + new ValueMatcherColumnSelectorStrategyFactory(); + private final ColumnSelectorFactory columnSelectorFactory; public FilteredAggregatorValueMatcherFactory(ColumnSelectorFactory columnSelectorFactory) @@ -228,67 +231,16 @@ public class FilteredAggregatorFactory extends AggregatorFactory ); } - final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector( - new DefaultDimensionSpec(dimension, dimension) - ); + ColumnSelectorPlus[] selector = + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + ImmutableList.of(DefaultDimensionSpec.of(dimension)), + columnSelectorFactory + ); - // Compare "value" as null if it's empty. - final String valueString = Strings.emptyToNull(value); - // Missing columns match a null or empty string value, and don't match anything else. - if (selector == null) { - return new BooleanValueMatcher(valueString == null); - } - - final int cardinality = selector.getValueCardinality(); - - if (cardinality >= 0) { - // Dictionary-encoded dimension. Compare by id instead of by value to save time. - final int valueId = selector.lookupId(valueString); - - return new ValueMatcher() - { - @Override - public boolean matches() - { - final IndexedInts row = selector.getRow(); - final int size = row.size(); - if (size == 0) { - // null should match empty rows in multi-value columns - return valueString == null; - } else { - for (int i = 0; i < size; ++i) { - if (row.get(i) == valueId) { - return true; - } - } - return false; - } - } - }; - } else { - // Not dictionary-encoded. Skip the optimization. - return new ValueMatcher() - { - @Override - public boolean matches() - { - final IndexedInts row = selector.getRow(); - final int size = row.size(); - if (size == 0) { - // null should match empty rows in multi-value columns - return valueString == null; - } else { - for (int i = 0; i < size; ++i) { - if (Objects.equals(selector.lookupName(row.get(i)), valueString)) { - return true; - } - } - return false; - } - } - }; - } + final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); + return strategy.getValueMatcher(dimension, columnSelectorFactory, value); } public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory) @@ -298,80 +250,21 @@ public class FilteredAggregatorFactory extends AggregatorFactory case LONG: return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate()); case STRING: - return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate()); + ColumnSelectorPlus[] selector = + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + ImmutableList.of(DefaultDimensionSpec.of(dimension)), + columnSelectorFactory + ); + + + final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); + return strategy.getValueMatcher(dimension, columnSelectorFactory, predicateFactory); default: return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null)); } } - public ValueMatcher makeStringValueMatcher(final String dimension, final Predicate predicate) - { - final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector( - new DefaultDimensionSpec(dimension, dimension) - ); - - final boolean doesMatchNull = predicate.apply(null); - - if (selector == null) { - return new BooleanValueMatcher(doesMatchNull); - } - - final int cardinality = selector.getValueCardinality(); - - if (cardinality >= 0) { - // Dictionary-encoded dimension. Check every value; build a bitset of matching ids. - final BitSet valueIds = new BitSet(cardinality); - for (int i = 0; i < cardinality; i++) { - if (predicate.apply(selector.lookupName(i))) { - valueIds.set(i); - } - } - - return new ValueMatcher() - { - @Override - public boolean matches() - { - final IndexedInts row = selector.getRow(); - final int size = row.size(); - if (size == 0) { - // null should match empty rows in multi-value columns - return doesMatchNull; - } else { - for (int i = 0; i < size; ++i) { - if (valueIds.get(row.get(i))) { - return true; - } - } - return false; - } - } - }; - } else { - // Not dictionary-encoded. Skip the optimization. - return new ValueMatcher() - { - @Override - public boolean matches() - { - final IndexedInts row = selector.getRow(); - final int size = row.size(); - if (size == 0) { - // null should match empty rows in multi-value columns - return doesMatchNull; - } else { - for (int i = 0; i < size; ++i) { - if (predicate.apply(selector.lookupName(row.get(i)))) { - return true; - } - } - return false; - } - } - }; - } - } - private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate) { return Filters.getLongPredicateMatcher( diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java index 791bfa80ad8..20bc344beeb 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregator.java @@ -23,74 +23,57 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import io.druid.query.aggregation.Aggregator; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; -import io.druid.segment.DimensionSelector; -import io.druid.segment.data.IndexedInts; -import java.util.Arrays; import java.util.List; public class CardinalityAggregator implements Aggregator { - private static final String NULL_STRING = "\u0000"; - - private final List selectorList; + private final String name; + private final List> selectorPlusList; private final boolean byRow; - private static final HashFunction hashFn = Hashing.murmur3_128(); - public static final char SEPARATOR = '\u0001'; + public static final HashFunction hashFn = Hashing.murmur3_128(); - protected static void hashRow(List selectorList, HyperLogLogCollector collector) + protected static void hashRow( + List> selectorPlusList, + HyperLogLogCollector collector + ) { final Hasher hasher = hashFn.newHasher(); - for (int k = 0; k < selectorList.size(); ++k) { + for (int k = 0; k < selectorPlusList.size(); ++k) { if (k != 0) { hasher.putByte((byte) 0); } - final DimensionSelector selector = selectorList.get(k); - final IndexedInts row = selector.getRow(); - final int size = row.size(); - // nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases. - if (size == 1) { - final String value = selector.lookupName(row.get(0)); - hasher.putUnencodedChars(value != null ? value : NULL_STRING); - } else if (size != 0) { - final String[] values = new String[size]; - for (int i = 0; i < size; ++i) { - final String value = selector.lookupName(row.get(i)); - values[i] = value != null ? value : NULL_STRING; - } - // Values need to be sorted to ensure consistent multi-value ordering across different segments - Arrays.sort(values); - for (int i = 0; i < size; ++i) { - if (i != 0) { - hasher.putChar(SEPARATOR); - } - hasher.putUnencodedChars(values[i]); - } - } + + ColumnSelectorPlus selectorPlus = selectorPlusList.get(k); + selectorPlus.getColumnSelectorStrategy().hashRow(selectorPlus.getSelector(), hasher); } collector.add(hasher.hash().asBytes()); } - protected static void hashValues(final List selectors, HyperLogLogCollector collector) + protected static void hashValues( + List> selectorPlusList, + HyperLogLogCollector collector + ) { - for (final DimensionSelector selector : selectors) { - for (final Integer index : selector.getRow()) { - final String value = selector.lookupName(index); - collector.add(hashFn.hashUnencodedChars(value == null ? NULL_STRING : value).asBytes()); - } + for (final ColumnSelectorPlus selectorPlus : selectorPlusList) { + selectorPlus.getColumnSelectorStrategy().hashValues(selectorPlus.getSelector(), collector); } } private HyperLogLogCollector collector; public CardinalityAggregator( - List selectorList, + String name, + List> selectorPlusList, boolean byRow ) { - this.selectorList = selectorList; + this.name = name; + this.selectorPlusList = selectorPlusList; this.collector = HyperLogLogCollector.makeLatestCollector(); this.byRow = byRow; } @@ -99,9 +82,9 @@ public class CardinalityAggregator implements Aggregator public void aggregate() { if (byRow) { - hashRow(selectorList, collector); + hashRow(selectorPlusList, collector); } else { - hashValues(selectorList, collector); + hashValues(selectorPlusList, collector); } } @@ -138,7 +121,7 @@ public class CardinalityAggregator implements Aggregator @Override public Aggregator clone() { - return new CardinalityAggregator(selectorList, byRow); + return new CardinalityAggregator(name, selectorPlusList, byRow); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index e7ea88c5793..d8ccf5bac29 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -23,26 +23,28 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.druid.java.util.common.StringUtils; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; import io.druid.query.aggregation.Aggregators; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; +import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionSelector; +import io.druid.segment.DimensionHandlerUtils; import org.apache.commons.codec.binary.Base64; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -94,6 +96,8 @@ public class CardinalityAggregatorFactory extends AggregatorFactory private static final byte CACHE_TYPE_ID = (byte) 0x8; private static final byte CACHE_KEY_SEPARATOR = (byte) 0xFF; + private static final CardinalityAggregatorColumnSelectorStrategyFactory STRATEGY_FACTORY = + new CardinalityAggregatorColumnSelectorStrategyFactory(); private final String name; private final List fields; @@ -133,44 +137,36 @@ public class CardinalityAggregatorFactory extends AggregatorFactory @Override public Aggregator factorize(final ColumnSelectorFactory columnFactory) { - List selectors = makeDimensionSelectors(columnFactory); + List> selectorPlusList = + Arrays.asList(DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + fields, + columnFactory + )); - if (selectors.isEmpty()) { + if (selectorPlusList.isEmpty()) { return Aggregators.noopAggregator(); } - return new CardinalityAggregator(selectors, byRow); + return new CardinalityAggregator(name, selectorPlusList, byRow); } @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) { - List selectors = makeDimensionSelectors(columnFactory); + List> selectorPlusList = + Arrays.asList(DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + fields, + columnFactory + )); - if (selectors.isEmpty()) { + if (selectorPlusList.isEmpty()) { return Aggregators.noopBufferAggregator(); } - return new CardinalityBufferAggregator(selectors, byRow); - } - - private List makeDimensionSelectors(final ColumnSelectorFactory columnFactory) - { - return Lists.newArrayList( - Iterables.filter( - Iterables.transform( - fields, new Function() - { - @Override - public DimensionSelector apply(DimensionSpec input) - { - return columnFactory.makeDimensionSelector(input); - } - } - ), Predicates.notNull() - ) - ); + return new CardinalityBufferAggregator(selectorPlusList, byRow); } @Override diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java index c791dc650f4..3ea20e2388d 100644 --- a/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java @@ -20,25 +20,26 @@ package io.druid.query.aggregation.cardinality; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; -import io.druid.segment.DimensionSelector; import java.nio.ByteBuffer; import java.util.List; public class CardinalityBufferAggregator implements BufferAggregator { - private final List selectorList; + private final List> selectorPlusList; private final boolean byRow; private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray(); public CardinalityBufferAggregator( - List selectorList, + List> selectorPlusList, boolean byRow ) { - this.selectorList = selectorList; + this.selectorPlusList = selectorPlusList; this.byRow = byRow; } @@ -62,9 +63,9 @@ public class CardinalityBufferAggregator implements BufferAggregator try { final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); if (byRow) { - CardinalityAggregator.hashRow(selectorList, collector); + CardinalityAggregator.hashRow(selectorPlusList, collector); } else { - CardinalityAggregator.hashValues(selectorList, collector); + CardinalityAggregator.hashValues(selectorPlusList, collector); } } finally { diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategy.java new file mode 100644 index 00000000000..c85ce0d739c --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategy.java @@ -0,0 +1,46 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.cardinality.types; + +import com.google.common.hash.Hasher; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.segment.ColumnValueSelector; + +public interface CardinalityAggregatorColumnSelectorStrategy extends + ColumnSelectorStrategy +{ + /*** + * Retrieve the current row from dimSelector and add the row values to the hasher. + * + * @param dimSelector Dimension value selector + * @param hasher Hasher used for cardinality aggregator calculations + */ + void hashRow(ValueSelectorType dimSelector, Hasher hasher); + + + /** + * Retrieve the current row from dimSelector and add the row values to HyperLogLogCollector. + * + * @param dimSelector Dimension value selector + * @param collector HLL collector used for cardinality aggregator calculations + */ + void hashValues(ValueSelectorType dimSelector, HyperLogLogCollector collector); +} diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java new file mode 100644 index 00000000000..9a74997855c --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/types/CardinalityAggregatorColumnSelectorStrategyFactory.java @@ -0,0 +1,43 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.cardinality.types; + +import io.druid.java.util.common.IAE; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ValueType; + +public class CardinalityAggregatorColumnSelectorStrategyFactory + implements ColumnSelectorStrategyFactory +{ + @Override + public CardinalityAggregatorColumnSelectorStrategy makeColumnSelectorStrategy( + ColumnCapabilities capabilities + ) + { + ValueType type = capabilities.getType(); + switch(type) { + case STRING: + return new StringCardinalityAggregatorColumnSelectorStrategy(); + default: + throw new IAE("Cannot create query type helper from invalid type [%s]", type); + } + } +} diff --git a/processing/src/main/java/io/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java new file mode 100644 index 00000000000..8127ecb4b73 --- /dev/null +++ b/processing/src/main/java/io/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java @@ -0,0 +1,76 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.cardinality.types; + +import com.google.common.hash.Hasher; +import io.druid.query.aggregation.cardinality.CardinalityAggregator; +import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; +import it.unimi.dsi.fastutil.ints.IntIterator; + +import java.util.Arrays; + +public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy +{ + public static final String CARDINALITY_AGG_NULL_STRING = "\u0000"; + public static final char CARDINALITY_AGG_SEPARATOR = '\u0001'; + + @Override + public void hashRow(DimensionSelector dimSelector, Hasher hasher) + { + final IndexedInts row = dimSelector.getRow(); + final int size = row.size(); + // nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases. + if (size == 1) { + final String value = dimSelector.lookupName(row.get(0)); + hasher.putUnencodedChars(nullToSpecial(value)); + } else if (size != 0) { + final String[] values = new String[size]; + for (int i = 0; i < size; ++i) { + final String value = dimSelector.lookupName(row.get(i)); + values[i] = nullToSpecial(value); + } + // Values need to be sorted to ensure consistent multi-value ordering across different segments + Arrays.sort(values); + for (int i = 0; i < size; ++i) { + if (i != 0) { + hasher.putChar(CARDINALITY_AGG_SEPARATOR); + } + hasher.putUnencodedChars(values[i]); + } + } + } + + @Override + public void hashValues(DimensionSelector dimSelector, HyperLogLogCollector collector) + { + for (IntIterator rowIt = dimSelector.getRow().iterator(); rowIt.hasNext(); ) { + int index = rowIt.nextInt(); + final String value = dimSelector.lookupName(index); + collector.add(CardinalityAggregator.hashFn.hashUnencodedChars(nullToSpecial(value)).asBytes()); + } + } + + private String nullToSpecial(String value) + { + return value == null ? CARDINALITY_AGG_NULL_STRING : value; + } +} diff --git a/processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategy.java new file mode 100644 index 00000000000..5e957d30e26 --- /dev/null +++ b/processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategy.java @@ -0,0 +1,27 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.dimension; + +/** + * Base type for strategy objects that handle value type operations pertaining to a specific query type + */ +public interface ColumnSelectorStrategy +{ +} diff --git a/processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategyFactory.java b/processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategyFactory.java new file mode 100644 index 00000000000..7729e05e48d --- /dev/null +++ b/processing/src/main/java/io/druid/query/dimension/ColumnSelectorStrategyFactory.java @@ -0,0 +1,27 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.dimension; + +import io.druid.segment.column.ColumnCapabilities; + +public interface ColumnSelectorStrategyFactory +{ + ColumnSelectorStrategyClass makeColumnSelectorStrategy(ColumnCapabilities capabilities); +} diff --git a/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java new file mode 100644 index 00000000000..8a1207c3542 --- /dev/null +++ b/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java @@ -0,0 +1,160 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.filter; + +import com.google.common.base.Predicate; +import com.google.common.base.Strings; +import io.druid.query.dimension.DefaultDimensionSpec; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; + +import java.util.BitSet; +import java.util.Objects; + +public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherColumnSelectorStrategy +{ + @Override + public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final String value) + { + final String valueStr = Strings.emptyToNull(value); + final DimensionSelector selector = cursor.makeDimensionSelector( + new DefaultDimensionSpec(columnName, columnName) + ); + + // if matching against null, rows with size 0 should also match + final boolean matchNull = Strings.isNullOrEmpty(valueStr); + + final int cardinality = selector.getValueCardinality(); + + if (cardinality >= 0) { + // Dictionary-encoded dimension. Compare by id instead of by value to save time. + final int valueId = selector.lookupId(valueStr); + + return new ValueMatcher() + { + @Override + public boolean matches() + { + final IndexedInts row = selector.getRow(); + final int size = row.size(); + if (size == 0) { + // null should match empty rows in multi-value columns + return matchNull; + } else { + for (int i = 0; i < size; ++i) { + if (row.get(i) == valueId) { + return true; + } + } + return false; + } + } + }; + } else { + // Not dictionary-encoded. Skip the optimization. + return new ValueMatcher() + { + @Override + public boolean matches() + { + final IndexedInts row = selector.getRow(); + final int size = row.size(); + if (size == 0) { + // null should match empty rows in multi-value columns + return matchNull; + } else { + for (int i = 0; i < size; ++i) { + if (Objects.equals(selector.lookupName(row.get(i)), valueStr)) { + return true; + } + } + return false; + } + } + }; + } + } + + @Override + public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory) + { + final DimensionSelector selector = cursor.makeDimensionSelector( + new DefaultDimensionSpec(columnName, columnName) + ); + + final Predicate predicate = predicateFactory.makeStringPredicate(); + final int cardinality = selector.getValueCardinality(); + final boolean matchNull = predicate.apply(null); + + if (cardinality >= 0) { + // Dictionary-encoded dimension. Check every value; build a bitset of matching ids. + final BitSet valueIds = new BitSet(cardinality); + for (int i = 0; i < cardinality; i++) { + if (predicate.apply(selector.lookupName(i))) { + valueIds.set(i); + } + } + + return new ValueMatcher() + { + @Override + public boolean matches() + { + final IndexedInts row = selector.getRow(); + final int size = row.size(); + if (size == 0) { + // null should match empty rows in multi-value columns + return matchNull; + } else { + for (int i = 0; i < size; ++i) { + if (valueIds.get(row.get(i))) { + return true; + } + } + return false; + } + } + }; + } else { + // Not dictionary-encoded. Skip the optimization. + return new ValueMatcher() + { + @Override + public boolean matches() + { + final IndexedInts row = selector.getRow(); + final int size = row.size(); + if (size == 0) { + // null should match empty rows in multi-value columns + return matchNull; + } else { + for (int i = 0; i < size; ++i) { + if (predicate.apply(selector.lookupName(row.get(i)))) { + return true; + } + } + return false; + } + } + }; + } + } +} diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java new file mode 100644 index 00000000000..6e6c747bd4d --- /dev/null +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java @@ -0,0 +1,45 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.filter; + +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.segment.ColumnSelectorFactory; + +public interface ValueMatcherColumnSelectorStrategy extends ColumnSelectorStrategy +{ + /** + * Create a single value ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory. + * + * @param cursor ColumnSelectorFactory for creating dimension value selectors + * @param value Value to match against + * @return ValueMatcher that matches on 'value' + */ + ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, String value); + + + /** + * Create a predicate-based ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory. + * + * @param cursor ColumnSelectorFactory for creating dimension value selectors + * @param predicateFactory A DruidPredicateFactory that provides the filter predicates to be matched + * @return A ValueMatcher that applies the predicate for this DimensionQueryHelper's value type from the predicateFactory + */ + ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory); +} diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java new file mode 100644 index 00000000000..828189471d1 --- /dev/null +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java @@ -0,0 +1,43 @@ +package io.druid.query.filter; + +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.druid.java.util.common.IAE; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ValueType; + +public class ValueMatcherColumnSelectorStrategyFactory + implements ColumnSelectorStrategyFactory +{ + @Override + public ValueMatcherColumnSelectorStrategy makeColumnSelectorStrategy( + ColumnCapabilities capabilities + ) + { + ValueType type = capabilities.getType(); + switch (type) { + case STRING: + return new StringValueMatcherColumnSelectorStrategy(); + default: + throw new IAE("Cannot create query type helper from invalid type [%s]", type); + } + } +} diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java index 62595125360..33e0c57aa8c 100644 --- a/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java @@ -40,7 +40,7 @@ public interface ValueMatcherFactory * An implementation of this method should be able to handle dimensions of various types. * * @param dimension The dimension to filter. - * @param value The value to match against. + * @param value The value to match against, represented as a String. * * @return An object that matches row values on the provided value. */ diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 7fdae22a87d..5c17f5bbc81 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -34,15 +34,21 @@ import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.ResourceClosingSequence; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.strategy.GroupByStrategyV2; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.Cursor; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.DimensionSelector; import io.druid.segment.StorageAdapter; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ValueType; import io.druid.segment.VirtualColumns; -import io.druid.segment.data.EmptyIndexedInts; import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.Filters; import org.joda.time.DateTime; @@ -58,6 +64,19 @@ import java.util.NoSuchElementException; public class GroupByQueryEngineV2 { + private static final GroupByStrategyFactory STRATEGY_FACTORY = new GroupByStrategyFactory(); + + private static GroupByColumnSelectorPlus[] createGroupBySelectorPlus(ColumnSelectorPlus[] baseSelectorPlus) + { + GroupByColumnSelectorPlus[] retInfo = new GroupByColumnSelectorPlus[baseSelectorPlus.length]; + int curPos = 0; + for (int i = 0; i < retInfo.length; i++) { + retInfo[i] = new GroupByColumnSelectorPlus(baseSelectorPlus[i], curPos); + curPos += retInfo[i].getColumnSelectorStrategy().getGroupingKeySize(); + } + return retInfo; + } + private GroupByQueryEngineV2() { // No instantiation @@ -89,7 +108,7 @@ public class GroupByQueryEngineV2 false ); - final Grouper.KeySerde keySerde = new GroupByEngineKeySerde(query.getDimensions().size()); + final ResourceHolder bufferHolder = intermediateResultsBufferPool.take(); final String fudgeTimestampString = Strings.emptyToNull( @@ -115,13 +134,18 @@ public class GroupByQueryEngineV2 @Override public GroupByEngineIterator make() { + ColumnSelectorPlus[] selectorPlus = DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + query.getDimensions(), + cursor + ); return new GroupByEngineIterator( query, config, cursor, bufferHolder.get(), - keySerde, - fudgeTimestamp + fudgeTimestamp, + createGroupBySelectorPlus(selectorPlus) ); } @@ -147,6 +171,177 @@ public class GroupByQueryEngineV2 ); } + private static class GroupByStrategyFactory implements ColumnSelectorStrategyFactory + { + @Override + public GroupByColumnSelectorStrategy makeColumnSelectorStrategy( + ColumnCapabilities capabilities + ) + { + ValueType type = capabilities.getType(); + switch(type) { + case STRING: + return new StringGroupByColumnSelectorStrategy(); + default: + throw new IAE("Cannot create query type helper from invalid type [%s]", type); + } + } + } + + /** + * Contains a collection of query processing methods for type-specific operations used exclusively by + * GroupByQueryEngineV2. + * + * Each GroupByColumnSelectorStrategy is associated with a single dimension. + */ + private interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy + { + /** + * Return the size, in bytes, of this dimension's values in the grouping key. + * + * For example, a String implementation would return 4, the size of an int. + * + * @return size, in bytes, of this dimension's values in the grouping key. + */ + int getGroupingKeySize(); + + /** + * Read a value from a grouping key and add it to the group by query result map, using the output name specified + * in a DimensionSpec. + * + * An implementation may choose to not add anything to the result map + * (e.g., as the String implementation does for empty rows) + * + * selectorPlus provides access to: + * - the keyBufferPosition offset from which to read the value + * - the dimension value selector + * - the DimensionSpec for this dimension from the query + * + * @param selectorPlus dimension info containing the key offset, value selector, and dimension spec + * @param resultMap result map for the group by query being served + * @param key grouping key + */ + void processValueFromGroupingKey( + GroupByColumnSelectorPlus selectorPlus, + ByteBuffer key, + Map resultMap + ); + + /** + * Retrieve a row object from the ColumnSelectorPlus and put it in valuess at columnIndex. + * + * @param selector Value selector for a column. + * @param columnIndex Index of the column within the row values array + * @param valuess Row values array, one index per column + */ + void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess); + + /** + * Read the first value within a row values object (IndexedInts, IndexedLongs, etc.) and write that value + * to the keyBuffer at keyBufferPosition. If rowSize is 0, write GROUP_BY_MISSING_VALUE instead. + * + * If the size of the row is > 0, write 1 to stack[] at columnIndex, otherwise write 0. + * + * @param keyBufferPosition Starting offset for this column's value within the grouping key. + * @param columnIndex Index of the column within the row values array + * @param rowObj Row value object for this column (e.g., IndexedInts) + * @param keyBuffer grouping key + * @param stack array containing the current within-row value index for each column + */ + void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack); + + /** + * If rowValIdx is less than the size of rowObj (haven't handled all of the row values): + * First, read the value at rowValIdx from a rowObj and write that value to the keyBuffer at keyBufferPosition. + * Then return true + * + * Otherwise, return false. + * + * @param keyBufferPosition Starting offset for this column's value within the grouping key. + * @param rowObj Row value object for this column (e.g., IndexedInts) + * @param rowValIdx Index of the current value being grouped on within the row + * @param keyBuffer grouping key + * @return true if rowValIdx < size of rowObj, false otherwise + */ + boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer); + } + + private static class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy + { + private static final int GROUP_BY_MISSING_VALUE = -1; + + @Override + public int getGroupingKeySize() + { + return Ints.BYTES; + } + + @Override + public void processValueFromGroupingKey(GroupByColumnSelectorPlus selectorPlus, ByteBuffer key, Map resultMap) + { + final int id = key.getInt(selectorPlus.getKeyBufferPosition()); + + // GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map. + if (id != GROUP_BY_MISSING_VALUE) { + resultMap.put( + selectorPlus.getOutputName(), + ((DimensionSelector) selectorPlus.getSelector()).lookupName(id) + ); + } else { + resultMap.put(selectorPlus.getOutputName(), ""); + } + } + + @Override + public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess) + { + DimensionSelector dimSelector = (DimensionSelector) selector; + IndexedInts row = dimSelector.getRow(); + valuess[columnIndex] = row; + } + + @Override + public void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack) + { + IndexedInts row = (IndexedInts) rowObj; + int rowSize = row.size(); + + initializeGroupingKeyV2Dimension(row, rowSize, keyBuffer, keyBufferPosition); + stack[columnIndex] = rowSize == 0 ? 0 : 1; + } + + @Override + public boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer) + { + IndexedInts row = (IndexedInts) rowObj; + int rowSize = row.size(); + + if (rowValIdx < rowSize) { + keyBuffer.putInt( + keyBufferPosition, + row.get(rowValIdx) + ); + return true; + } else { + return false; + } + } + + private void initializeGroupingKeyV2Dimension( + final IndexedInts values, + final int rowSize, + final ByteBuffer keyBuffer, + final int keyBufferPosition + ) + { + if (rowSize == 0) { + keyBuffer.putInt(keyBufferPosition, GROUP_BY_MISSING_VALUE); + } else { + keyBuffer.putInt(keyBufferPosition, values.get(0)); + } + } + } + private static class GroupByEngineIterator implements Iterator, Closeable { private final GroupByQuery query; @@ -155,10 +350,10 @@ public class GroupByQueryEngineV2 private final ByteBuffer buffer; private final Grouper.KeySerde keySerde; private final DateTime timestamp; - private final DimensionSelector[] selectors; private final ByteBuffer keyBuffer; private final int[] stack; - private final IndexedInts[] valuess; + private final Object[] valuess; + private final GroupByColumnSelectorPlus[] dims; private int stackp = Integer.MIN_VALUE; private boolean currentRowWasPartiallyAggregated = false; @@ -169,8 +364,8 @@ public class GroupByQueryEngineV2 final GroupByQueryConfig config, final Cursor cursor, final ByteBuffer buffer, - final Grouper.KeySerde keySerde, - final DateTime fudgeTimestamp + final DateTime fudgeTimestamp, + final GroupByColumnSelectorPlus[] dims ) { final int dimCount = query.getDimensions().size(); @@ -179,14 +374,11 @@ public class GroupByQueryEngineV2 this.querySpecificConfig = config.withOverrides(query); this.cursor = cursor; this.buffer = buffer; - this.keySerde = keySerde; + this.keySerde = new GroupByEngineKeySerde(dims); this.keyBuffer = ByteBuffer.allocate(keySerde.keySize()); - this.selectors = new DimensionSelector[dimCount]; - for (int i = 0; i < dimCount; i++) { - this.selectors[i] = cursor.makeDimensionSelector(query.getDimensions().get(i)); - } + this.dims = dims; this.stack = new int[dimCount]; - this.valuess = new IndexedInts[dimCount]; + this.valuess = new Object[dimCount]; // Time is the same for every row in the cursor this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime(); @@ -226,19 +418,20 @@ outer: // Set up stack, valuess, and first grouping in keyBuffer for this row stackp = stack.length - 1; - for (int i = 0; i < selectors.length; i++) { - final DimensionSelector selector = selectors[i]; - - valuess[i] = selector == null ? EmptyIndexedInts.EMPTY_INDEXED_INTS : selector.getRow(); - - final int position = Ints.BYTES * i; - if (valuess[i].size() == 0) { - stack[i] = 0; - keyBuffer.putInt(position, -1); - } else { - stack[i] = 1; - keyBuffer.putInt(position, valuess[i].get(0)); - } + for (int i = 0; i < dims.length; i++) { + GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy(); + strategy.initColumnValues( + dims[i].getSelector(), + i, + valuess + ); + strategy.initGroupingKeyColumnValue( + dims[i].getKeyBufferPosition(), + i, + valuess[i], + keyBuffer, + stack + ); } } @@ -256,28 +449,29 @@ outer: doAggregate = false; } - if (stackp >= 0 && stack[stackp] < valuess[stackp].size()) { - // Load next value for current slot - keyBuffer.putInt( - Ints.BYTES * stackp, - valuess[stackp].get(stack[stackp]) + if (stackp >= 0) { + doAggregate = dims[stackp].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey( + dims[stackp].getKeyBufferPosition(), + valuess[stackp], + stack[stackp], + keyBuffer ); - stack[stackp]++; - // Reset later slots - for (int i = stackp + 1; i < stack.length; i++) { - final int position = Ints.BYTES * i; - if (valuess[i].size() == 0) { - stack[i] = 0; - keyBuffer.putInt(position, -1); - } else { - stack[i] = 1; - keyBuffer.putInt(position, valuess[i].get(0)); + if (doAggregate) { + stack[stackp]++; + for (int i = stackp + 1; i < stack.length; i++) { + dims[i].getColumnSelectorStrategy().initGroupingKeyColumnValue( + dims[i].getKeyBufferPosition(), + i, + valuess[i], + keyBuffer, + stack + ); } + stackp = stack.length - 1; + } else { + stackp--; } - - stackp = stack.length - 1; - doAggregate = true; } else { stackp--; } @@ -299,15 +493,12 @@ outer: Map theMap = Maps.newLinkedHashMap(); // Add dimensions. - for (int i = 0; i < selectors.length; i++) { - final int id = entry.getKey().getInt(Ints.BYTES * i); - - if (id >= 0) { - theMap.put( - query.getDimensions().get(i).getOutputName(), - selectors[i].lookupName(id) - ); - } + for (GroupByColumnSelectorPlus selectorPlus : dims) { + selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey( + selectorPlus, + entry.getKey(), + theMap + ); } // Add aggregations. @@ -356,9 +547,13 @@ outer: { private final int keySize; - public GroupByEngineKeySerde(final int dimCount) + public GroupByEngineKeySerde(final GroupByColumnSelectorPlus dims[]) { - this.keySize = dimCount * Ints.BYTES; + int keySize = 0; + for (GroupByColumnSelectorPlus selectorPlus : dims) { + keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySize(); + } + this.keySize = keySize; } @Override @@ -400,4 +595,28 @@ outer: // No state, nothing to reset } } + + private static class GroupByColumnSelectorPlus extends ColumnSelectorPlus + { + /** + * Indicates the offset of this dimension's value within the grouping key. + */ + private int keyBufferPosition; + + public GroupByColumnSelectorPlus(ColumnSelectorPlus baseInfo, int keyBufferPosition) + { + super( + baseInfo.getName(), + baseInfo.getOutputName(), + baseInfo.getColumnSelectorStrategy(), + baseInfo.getSelector() + ); + this.keyBufferPosition = keyBufferPosition; + } + + public int getKeyBufferPosition() + { + return keyBufferPosition; + } + } } diff --git a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java index 9e6b7e85920..8bcafa905f9 100644 --- a/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java +++ b/processing/src/main/java/io/druid/query/search/SearchQueryRunner.java @@ -24,7 +24,6 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.collections.bitmap.MutableBitmap; @@ -35,11 +34,14 @@ import io.druid.java.util.common.guava.Accumulator; import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.Druids; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.Result; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; import io.druid.query.extraction.ExtractionFn; import io.druid.query.extraction.IdentityExtractionFn; import io.druid.query.filter.Filter; @@ -47,7 +49,9 @@ import io.druid.query.search.search.SearchHit; import io.druid.query.search.search.SearchQuery; import io.druid.query.search.search.SearchQuerySpec; import io.druid.segment.ColumnSelectorBitmapIndexSelector; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.Cursor; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.DimensionSelector; import io.druid.segment.QueryableIndex; import io.druid.segment.Segment; @@ -55,24 +59,96 @@ import io.druid.segment.StorageAdapter; import io.druid.segment.VirtualColumns; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.GenericColumn; +import io.druid.segment.column.ValueType; import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.Filters; -import org.apache.commons.lang.mutable.MutableInt; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap; import org.joda.time.Interval; import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.TreeMap; /** */ public class SearchQueryRunner implements QueryRunner> { + private static final SearchStrategyFactory STRATEGY_FACTORY = new SearchStrategyFactory(); + private static final EmittingLogger log = new EmittingLogger(SearchQueryRunner.class); private final Segment segment; + private static class SearchStrategyFactory implements ColumnSelectorStrategyFactory + { + @Override + public SearchColumnSelectorStrategy makeColumnSelectorStrategy( + ColumnCapabilities capabilities + ) + { + ValueType type = capabilities.getType(); + switch(type) { + case STRING: + return new StringSearchColumnSelectorStrategy(); + default: + throw new IAE("Cannot create query type helper from invalid type [%s]", type); + } + } + } + + public interface SearchColumnSelectorStrategy extends ColumnSelectorStrategy + { + /** + * Read the current row from dimSelector and update the search result set. + * + * For each row value: + * 1. Check if searchQuerySpec accept()s the value + * 2. If so, add the value to the result set and increment the counter for that value + * 3. If the size of the result set reaches the limit after adding a value, return early. + * + * @param outputName Output name for this dimension in the search query being served + * @param dimSelector Dimension value selector + * @param searchQuerySpec Spec for the search query + * @param set The result set of the search query + * @param limit The limit of the search query + */ + void updateSearchResultSet( + String outputName, + ValueSelectorType dimSelector, + SearchQuerySpec searchQuerySpec, + int limit, + Object2IntRBTreeMap set + ); + } + + public static class StringSearchColumnSelectorStrategy implements SearchColumnSelectorStrategy + { + @Override + public void updateSearchResultSet( + String outputName, + DimensionSelector selector, + SearchQuerySpec searchQuerySpec, + int limit, + final Object2IntRBTreeMap set + ) + { + if (selector != null) { + final IndexedInts vals = selector.getRow(); + for (int i = 0; i < vals.size(); ++i) { + final String dimVal = selector.lookupName(vals.get(i)); + if (searchQuerySpec.accept(dimVal)) { + set.addTo(new SearchHit(outputName, dimVal), 1); + if (set.size() >= limit) { + return; + } + } + } + } + } + } + public SearchQueryRunner(Segment segment) { this.segment = segment; @@ -93,7 +169,6 @@ public class SearchQueryRunner implements QueryRunner> final List dimensions = query.getDimensions(); final SearchQuerySpec searchQuerySpec = query.getQuery(); final int limit = query.getLimit(); - final boolean descending = query.isDescending(); final List intervals = query.getQuerySegmentSpec().getIntervals(); if (intervals.size() != 1) { throw new IAE("Should only have one interval, got[%s]", intervals); @@ -103,85 +178,25 @@ public class SearchQueryRunner implements QueryRunner> // Closing this will cause segfaults in unit tests. final QueryableIndex index = segment.asQueryableIndex(); - if (index != null) { - final TreeMap retVal = Maps.newTreeMap(query.getSort().getComparator()); + final StorageAdapter storageAdapter = segment.asStorageAdapter(); - Iterable dimsToSearch; - if (dimensions == null || dimensions.isEmpty()) { - dimsToSearch = Iterables.transform(index.getAvailableDimensions(), Druids.DIMENSION_IDENTITY); - } else { - dimsToSearch = dimensions; + final List bitmapDims = Lists.newArrayList(); + final List nonBitmapDims = Lists.newArrayList(); + partitionDimensionList(index, storageAdapter, dimensions, bitmapDims, nonBitmapDims); + + final Object2IntRBTreeMap retVal = new Object2IntRBTreeMap(query.getSort().getComparator()); + retVal.defaultReturnValue(0); + + // Get results from bitmap supporting dims first + if (!bitmapDims.isEmpty()) { + processBitmapDims(index, filter, interval, bitmapDims, searchQuerySpec, limit, retVal); + // If there are no non-bitmap dims to search, or we've already hit the result limit, just return now + if (nonBitmapDims.size() == 0 || retVal.size() >= limit) { + return makeReturnResult(limit, retVal); } - - final BitmapFactory bitmapFactory = index.getBitmapFactoryForDimensions(); - - final ImmutableBitmap baseFilter = - filter == null ? null : filter.getBitmapIndex(new ColumnSelectorBitmapIndexSelector(bitmapFactory, index)); - - ImmutableBitmap timeFilteredBitmap; - if (!interval.contains(segment.getDataInterval())) { - MutableBitmap timeBitmap = bitmapFactory.makeEmptyMutableBitmap(); - final Column timeColumn = index.getColumn(Column.TIME_COLUMN_NAME); - try (final GenericColumn timeValues = timeColumn.getGenericColumn()) { - - int startIndex = Math.max(0, getStartIndexOfTime(timeValues, interval.getStartMillis(), true)); - int endIndex = Math.min( - timeValues.length() - 1, - getStartIndexOfTime(timeValues, interval.getEndMillis(), false) - ); - - for (int i = startIndex; i <= endIndex; i++) { - timeBitmap.add(i); - } - - final ImmutableBitmap finalTimeBitmap = bitmapFactory.makeImmutableBitmap(timeBitmap); - timeFilteredBitmap = - (baseFilter == null) ? finalTimeBitmap : finalTimeBitmap.intersection(baseFilter); - } - } else { - timeFilteredBitmap = baseFilter; - } - - for (DimensionSpec dimension : dimsToSearch) { - final Column column = index.getColumn(dimension.getDimension()); - if (column == null) { - continue; - } - - final BitmapIndex bitmapIndex = column.getBitmapIndex(); - ExtractionFn extractionFn = dimension.getExtractionFn(); - if (extractionFn == null) { - extractionFn = IdentityExtractionFn.getInstance(); - } - if (bitmapIndex != null) { - for (int i = 0; i < bitmapIndex.getCardinality(); ++i) { - String dimVal = Strings.nullToEmpty(extractionFn.apply(bitmapIndex.getValue(i))); - if (!searchQuerySpec.accept(dimVal)) { - continue; - } - ImmutableBitmap bitmap = bitmapIndex.getBitmap(i); - if (timeFilteredBitmap != null) { - bitmap = bitmapFactory.intersection(Arrays.asList(timeFilteredBitmap, bitmap)); - } - if (bitmap.size() > 0) { - MutableInt counter = new MutableInt(bitmap.size()); - MutableInt prev = retVal.put(new SearchHit(dimension.getOutputName(), dimVal), counter); - if (prev != null) { - counter.add(prev.intValue()); - } - if (retVal.size() >= limit) { - return makeReturnResult(limit, retVal); - } - } - } - } - } - - return makeReturnResult(limit, retVal); } final StorageAdapter adapter = segment.asStorageAdapter(); - if (adapter == null) { log.makeAlert("WTF!? Unable to process search query on segment.") .addData("segment", segment.getIdentifier()) @@ -190,71 +205,7 @@ public class SearchQueryRunner implements QueryRunner> "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." ); } - - final Iterable dimsToSearch; - if (dimensions == null || dimensions.isEmpty()) { - dimsToSearch = Iterables.transform(adapter.getAvailableDimensions(), Druids.DIMENSION_IDENTITY); - } else { - dimsToSearch = dimensions; - } - - final Sequence cursors = adapter.makeCursors( - filter, - interval, - VirtualColumns.EMPTY, - query.getGranularity(), - descending - ); - - final TreeMap retVal = cursors.accumulate( - Maps.newTreeMap(query.getSort().getComparator()), - new Accumulator, Cursor>() - { - @Override - public TreeMap accumulate(TreeMap set, Cursor cursor) - { - if (set.size() >= limit) { - return set; - } - - Map dimSelectors = Maps.newHashMap(); - for (DimensionSpec dim : dimsToSearch) { - dimSelectors.put( - dim.getOutputName(), - cursor.makeDimensionSelector(dim) - ); - } - - while (!cursor.isDone()) { - for (Map.Entry entry : dimSelectors.entrySet()) { - final DimensionSelector selector = entry.getValue(); - - if (selector != null) { - final IndexedInts vals = selector.getRow(); - for (int i = 0; i < vals.size(); ++i) { - final String dimVal = selector.lookupName(vals.get(i)); - if (searchQuerySpec.accept(dimVal)) { - MutableInt counter = new MutableInt(1); - MutableInt prev = set.put(new SearchHit(entry.getKey(), dimVal), counter); - if (prev != null) { - counter.add(prev.intValue()); - } - if (set.size() >= limit) { - return set; - } - } - } - } - } - - cursor.advance(); - } - - return set; - } - } - ); - + processNonBitmapDims(query, adapter, filter, interval, limit, nonBitmapDims, searchQuerySpec, retVal); return makeReturnResult(limit, retVal); } @@ -289,19 +240,22 @@ public class SearchQueryRunner implements QueryRunner> } private Sequence> makeReturnResult( - int limit, TreeMap retVal) + int limit, + Object2IntRBTreeMap retVal + ) { Iterable source = Iterables.transform( - retVal.entrySet(), new Function, SearchHit>() + retVal.object2IntEntrySet(), new Function, SearchHit>() { @Override - public SearchHit apply(Map.Entry input) + public SearchHit apply(Object2IntMap.Entry input) { SearchHit hit = input.getKey(); - return new SearchHit(hit.getDimension(), hit.getValue(), input.getValue().intValue()); + return new SearchHit(hit.getDimension(), hit.getValue(), input.getIntValue()); } } ); + return Sequences.simple( ImmutableList.of( new Result( @@ -313,4 +267,175 @@ public class SearchQueryRunner implements QueryRunner> ) ); } + + // Split dimension list into bitmap-supporting list and non-bitmap supporting list + private void partitionDimensionList( + QueryableIndex index, + StorageAdapter storageAdapter, + List dimensions, + List bitmapDims, + List nonBitmapDims + ) + { + List dimsToSearch; + if (dimensions == null || dimensions.isEmpty()) { + dimsToSearch = Lists.newArrayList(Iterables.transform( + storageAdapter.getAvailableDimensions(), + Druids.DIMENSION_IDENTITY + )); + } else { + dimsToSearch = dimensions; + } + + if (index != null) { + for (DimensionSpec spec : dimsToSearch) { + ColumnCapabilities capabilities = storageAdapter.getColumnCapabilities(spec.getDimension()); + if (capabilities == null) { + continue; + } + + if (capabilities.hasBitmapIndexes()) { + bitmapDims.add(spec); + } else { + nonBitmapDims.add(spec); + } + } + } else { + // no QueryableIndex available, so nothing has bitmaps + nonBitmapDims.addAll(dimsToSearch); + } + } + + private void processNonBitmapDims( + SearchQuery query, + final StorageAdapter adapter, + Filter filter, + Interval interval, + final int limit, + final List nonBitmapDims, + final SearchQuerySpec searchQuerySpec, + final Object2IntRBTreeMap retVal + ) + { + final Sequence cursors = adapter.makeCursors( + filter, + interval, + VirtualColumns.EMPTY, + query.getGranularity(), + query.isDescending() + ); + + cursors.accumulate( + retVal, + new Accumulator, Cursor>() + { + @Override + public Object2IntRBTreeMap accumulate(Object2IntRBTreeMap set, Cursor cursor) + { + if (set.size() >= limit) { + return set; + } + + List> selectorPlusList = Arrays.asList( + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + nonBitmapDims, + cursor + ) + ); + + while (!cursor.isDone()) { + for (ColumnSelectorPlus selectorPlus : selectorPlusList) { + selectorPlus.getColumnSelectorStrategy().updateSearchResultSet( + selectorPlus.getOutputName(), + selectorPlus.getSelector(), + searchQuerySpec, + limit, + set + ); + + if (set.size() >= limit) { + return set; + } + } + + cursor.advance(); + } + + return set; + } + } + ); + } + + private void processBitmapDims( + QueryableIndex index, + Filter filter, + Interval interval, + List bitmapDims, + SearchQuerySpec searchQuerySpec, + int limit, + final Object2IntRBTreeMap retVal + ) + { + final BitmapFactory bitmapFactory = index.getBitmapFactoryForDimensions(); + + final ImmutableBitmap baseFilter = + filter == null ? null : filter.getBitmapIndex(new ColumnSelectorBitmapIndexSelector(bitmapFactory, index)); + + ImmutableBitmap timeFilteredBitmap; + if (!interval.contains(segment.getDataInterval())) { + MutableBitmap timeBitmap = bitmapFactory.makeEmptyMutableBitmap(); + final Column timeColumn = index.getColumn(Column.TIME_COLUMN_NAME); + try (final GenericColumn timeValues = timeColumn.getGenericColumn()) { + + int startIndex = Math.max(0, getStartIndexOfTime(timeValues, interval.getStartMillis(), true)); + int endIndex = Math.min( + timeValues.length() - 1, + getStartIndexOfTime(timeValues, interval.getEndMillis(), false) + ); + + for (int i = startIndex; i <= endIndex; i++) { + timeBitmap.add(i); + } + + final ImmutableBitmap finalTimeBitmap = bitmapFactory.makeImmutableBitmap(timeBitmap); + timeFilteredBitmap = + (baseFilter == null) ? finalTimeBitmap : finalTimeBitmap.intersection(baseFilter); + } + } else { + timeFilteredBitmap = baseFilter; + } + + for (DimensionSpec dimension : bitmapDims) { + final Column column = index.getColumn(dimension.getDimension()); + if (column == null) { + continue; + } + + final BitmapIndex bitmapIndex = column.getBitmapIndex(); + ExtractionFn extractionFn = dimension.getExtractionFn(); + if (extractionFn == null) { + extractionFn = IdentityExtractionFn.getInstance(); + } + if (bitmapIndex != null) { + for (int i = 0; i < bitmapIndex.getCardinality(); ++i) { + String dimVal = Strings.nullToEmpty(extractionFn.apply(bitmapIndex.getValue(i))); + if (!searchQuerySpec.accept(dimVal)) { + continue; + } + ImmutableBitmap bitmap = bitmapIndex.getBitmap(i); + if (timeFilteredBitmap != null) { + bitmap = bitmapFactory.intersection(Arrays.asList(timeFilteredBitmap, bitmap)); + } + if (bitmap.size() > 0) { + retVal.addTo(new SearchHit(dimension.getOutputName(), dimVal), bitmap.size()); + if (retVal.size() >= limit) { + return; + } + } + } + } + } + } } diff --git a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java index eb6786f9edb..411e5db119c 100644 --- a/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java +++ b/processing/src/main/java/io/druid/query/select/SelectQueryEngine.java @@ -24,14 +24,20 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.guava.Sequence; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.QueryRunnerHelper; import io.druid.query.Result; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; import io.druid.query.filter.Filter; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.Cursor; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.DimensionSelector; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; @@ -39,12 +45,16 @@ import io.druid.segment.Segment; import io.druid.segment.StorageAdapter; import io.druid.segment.VirtualColumns; import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ValueType; import io.druid.segment.data.IndexedInts; import io.druid.segment.filter.Filters; import io.druid.timeline.DataSegmentUtils; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -52,6 +62,67 @@ import java.util.Map; */ public class SelectQueryEngine { + private static final SelectStrategyFactory STRATEGY_FACTORY = new SelectStrategyFactory(); + + private static class SelectStrategyFactory implements ColumnSelectorStrategyFactory + { + @Override + public SelectColumnSelectorStrategy makeColumnSelectorStrategy( + ColumnCapabilities capabilities + ) + { + ValueType type = capabilities.getType(); + switch(type) { + case STRING: + return new StringSelectColumnSelectorStrategy(); + default: + throw new IAE("Cannot create query type helper from invalid type [%s]", type); + } + } + } + + public interface SelectColumnSelectorStrategy extends ColumnSelectorStrategy + { + /** + * Read the current row from dimSelector and add the row values for a dimension to the result map. + * + * Multi-valued rows should be added to the result as a List, single value rows should be added as a single object. + * + * @param outputName Output name for this dimension in the select query being served + * @param dimSelector Dimension value selector + * @param resultMap Row value map for the current row being retrieved by the select query + */ + void addRowValuesToSelectResult( + String outputName, + ValueSelectorType dimSelector, + Map resultMap + ); + } + + public static class StringSelectColumnSelectorStrategy implements SelectColumnSelectorStrategy + { + @Override + public void addRowValuesToSelectResult(String outputName, DimensionSelector selector, Map resultMap) + { + if (selector == null) { + resultMap.put(outputName, null); + } else { + final IndexedInts vals = selector.getRow(); + + if (vals.size() == 1) { + final String dimVal = selector.lookupName(vals.get(0)); + resultMap.put(outputName, dimVal); + } else { + List dimVals = new ArrayList<>(vals.size()); + for (int i = 0; i < vals.size(); ++i) { + dimVals.add(selector.lookupName(vals.get(i))); + } + resultMap.put(outputName, dimVals); + } + } + } + } + public Sequence> process(final SelectQuery query, final Segment segment) { final StorageAdapter adapter = segment.asStorageAdapter(); @@ -106,11 +177,16 @@ public class SelectQueryEngine final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME); - final Map dimSelectors = Maps.newHashMap(); - for (DimensionSpec dim : dims) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim); - dimSelectors.put(dim.getOutputName(), dimSelector); - builder.addDimension(dim.getOutputName()); + final List> selectorPlusList = Arrays.asList( + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + Lists.newArrayList(dims), + cursor + ) + ); + + for (DimensionSpec dimSpec : dims) { + builder.addDimension(dimSpec.getOutputName()); } final Map metSelectors = Maps.newHashMap(); @@ -129,26 +205,8 @@ public class SelectQueryEngine final Map theEvent = Maps.newLinkedHashMap(); theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get())); - for (Map.Entry dimSelector : dimSelectors.entrySet()) { - final String dim = dimSelector.getKey(); - final DimensionSelector selector = dimSelector.getValue(); - - if (selector == null) { - theEvent.put(dim, null); - } else { - final IndexedInts vals = selector.getRow(); - - if (vals.size() == 1) { - final String dimVal = selector.lookupName(vals.get(0)); - theEvent.put(dim, dimVal); - } else { - List dimVals = Lists.newArrayList(); - for (int i = 0; i < vals.size(); ++i) { - dimVals.add(selector.lookupName(vals.get(i))); - } - theEvent.put(dim, dimVals); - } - } + for (ColumnSelectorPlus selectorPlus : selectorPlusList) { + selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent); } for (Map.Entry metSelector : metSelectors.entrySet()) { diff --git a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java index 3a7d16e51a6..2a669990982 100644 --- a/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/AggregateTopNMetricFirstAlgorithm.java @@ -22,12 +22,12 @@ package io.druid.query.topn; import io.druid.collections.StupidPool; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.PostAggregator; import io.druid.segment.Capabilities; import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; import java.nio.ByteBuffer; import java.util.Arrays; @@ -55,11 +55,11 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm implements TopNAlgorithm { - protected static Aggregator[] makeAggregators(Cursor cursor, List aggregatorSpecs) + public static Aggregator[] makeAggregators(Cursor cursor, List aggregatorSpecs) { Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()]; int aggregatorIndex = 0; @@ -58,7 +58,7 @@ public abstract class BaseTopNAlgorithm + public static class AggregatorArrayProvider extends BaseArrayProvider { Aggregator[][] expansionAggs; int cardinality; - public AggregatorArrayProvider(DimensionSelector dimSelector, TopNQuery query, int cardinality) + public AggregatorArrayProvider(DimensionSelector dimSelector, TopNQuery query, int cardinality, Capabilities capabilities) { super(dimSelector, query, capabilities); diff --git a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java index 765b380d2ce..dc9749758f1 100644 --- a/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/DimExtractionTopNAlgorithm.java @@ -20,18 +20,18 @@ package io.druid.query.topn; import com.google.common.collect.Maps; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.Aggregator; +import io.druid.query.topn.types.TopNColumnSelectorStrategy; import io.druid.segment.Capabilities; import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; -import io.druid.segment.data.IndexedInts; import java.util.Map; /** * This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value. */ -public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm, TopNParams> +public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm, TopNParams> { private final TopNQuery query; @@ -47,12 +47,12 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm selectorPlus, final Cursor cursor ) { return new TopNParams( - dimSelector, + selectorPlus, cursor, Integer.MAX_VALUE ); @@ -61,16 +61,8 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm selectorPlus = params.getSelectorPlus(); + return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, capabilities); } @Override @@ -80,7 +72,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm makeDimValAggregateStore(TopNParams params) + protected Map makeDimValAggregateStore(TopNParams params) { return Maps.newHashMap(); } @@ -89,35 +81,21 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm aggregatesStore, + Map aggregatesStore, int numProcessed ) { final Cursor cursor = params.getCursor(); - final DimensionSelector dimSelector = params.getDimSelector(); + final ColumnSelectorPlus selectorPlus = params.getSelectorPlus(); while (!cursor.isDone()) { - final IndexedInts dimValues = dimSelector.getRow(); - - for (int i = 0; i < dimValues.size(); ++i) { - - final int dimIndex = dimValues.get(i); - Aggregator[] theAggregators = rowSelector[dimIndex]; - if (theAggregators == null) { - final String key = dimSelector.lookupName(dimIndex); - theAggregators = aggregatesStore.get(key); - if (theAggregators == null) { - theAggregators = makeAggregators(cursor, query.getAggregatorSpecs()); - aggregatesStore.put(key, theAggregators); - } - rowSelector[dimIndex] = theAggregators; - } - - for (Aggregator aggregator : theAggregators) { - aggregator.aggregate(); - } - } - + selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate( + query, + selectorPlus.getSelector(), + cursor, + rowSelector, + aggregatesStore + ); cursor.advance(); } } @@ -126,11 +104,11 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm aggregatesStore, + Map aggregatesStore, TopNResultBuilder resultBuilder ) { - for (Map.Entry entry : aggregatesStore.entrySet()) { + for (Map.Entry entry : aggregatesStore.entrySet()) { Aggregator[] aggs = entry.getValue(); if (aggs != null && aggs.length > 0) { Object[] vals = new Object[aggs.length]; @@ -139,7 +117,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm stringMap) + protected void closeAggregators(Map valueMap) { - for (Aggregator[] aggregators : stringMap.values()) { + for (Aggregator[] aggregators : valueMap.values()) { for (Aggregator agg : aggregators) { agg.close(); } diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index c476ef19850..709d7508a7b 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -24,6 +24,7 @@ import io.druid.collections.StupidPool; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.ColumnSelectorPlus; import io.druid.segment.Capabilities; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; @@ -57,13 +58,14 @@ public class PooledTopNAlgorithm @Override public PooledTopNParams makeInitParams( - DimensionSelector dimSelector, Cursor cursor + ColumnSelectorPlus selectorPlus, Cursor cursor ) { ResourceHolder resultsBufHolder = bufferPool.take(); ByteBuffer resultsBuf = resultsBufHolder.get(); resultsBuf.clear(); + final DimensionSelector dimSelector = (DimensionSelector) selectorPlus.getSelector(); final int cardinality = dimSelector.getValueCardinality(); if (cardinality < 0) { @@ -103,7 +105,7 @@ public class PooledTopNAlgorithm final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality; return PooledTopNParams.builder() - .withDimSelector(dimSelector) + .withSelectorPlus(selectorPlus) .withCursor(cursor) .withResultsBufHolder(resultsBufHolder) .withResultsBuf(resultsBuf) @@ -507,7 +509,7 @@ public class PooledTopNAlgorithm private final TopNMetricSpecBuilder arrayProvider; public PooledTopNParams( - DimensionSelector dimSelector, + ColumnSelectorPlus selectorPlus, Cursor cursor, ResourceHolder resultsBufHolder, ByteBuffer resultsBuf, @@ -517,7 +519,7 @@ public class PooledTopNAlgorithm TopNMetricSpecBuilder arrayProvider ) { - super(dimSelector, cursor, numValuesPerPass); + super(selectorPlus, cursor, numValuesPerPass); this.resultsBufHolder = resultsBufHolder; this.resultsBuf = resultsBuf; @@ -558,7 +560,7 @@ public class PooledTopNAlgorithm public static class Builder { - private DimensionSelector dimSelector; + private ColumnSelectorPlus selectorPlus; private Cursor cursor; private ResourceHolder resultsBufHolder; private ByteBuffer resultsBuf; @@ -569,7 +571,7 @@ public class PooledTopNAlgorithm public Builder() { - dimSelector = null; + selectorPlus = null; cursor = null; resultsBufHolder = null; resultsBuf = null; @@ -579,9 +581,9 @@ public class PooledTopNAlgorithm arrayProvider = null; } - public Builder withDimSelector(DimensionSelector dimSelector) + public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus) { - this.dimSelector = dimSelector; + this.selectorPlus = selectorPlus; return this; } @@ -630,7 +632,7 @@ public class PooledTopNAlgorithm public PooledTopNParams build() { return new PooledTopNParams( - dimSelector, + selectorPlus, cursor, resultsBufHolder, resultsBuf, diff --git a/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java index 3d4980baf5c..f4fa5a22677 100644 --- a/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/TimeExtractionTopNAlgorithm.java @@ -21,6 +21,7 @@ package io.druid.query.topn; import com.google.common.collect.Maps; import io.druid.query.aggregation.Aggregator; +import io.druid.query.ColumnSelectorPlus; import io.druid.segment.Capabilities; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; @@ -40,10 +41,10 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm public static final int INIT_POSITION_VALUE = -1; public static final int SKIP_POSITION_VALUE = -2; - public TopNParams makeInitParams(DimensionSelector dimSelector, Cursor cursor); + public TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor); public void run( Parameters params, diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java index d31e84fa791..27e224a6137 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java @@ -20,12 +20,17 @@ package io.druid.query.topn; import com.google.common.base.Function; +import com.google.common.collect.Lists; import io.druid.query.Result; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.topn.types.TopNStrategyFactory; import io.druid.segment.Cursor; -import io.druid.segment.DimensionSelector; +import io.druid.segment.DimensionHandlerUtils; public class TopNMapFn implements Function> { + private static final TopNStrategyFactory STRATEGY_FACTORY = new TopNStrategyFactory(); + private final TopNQuery query; private final TopNAlgorithm topNAlgorithm; @@ -42,16 +47,19 @@ public class TopNMapFn implements Function> @SuppressWarnings("unchecked") public Result apply(Cursor cursor) { - final DimensionSelector dimSelector = cursor.makeDimensionSelector( - query.getDimensionSpec() + final ColumnSelectorPlus[] selectorPlusArray = DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + Lists.newArrayList(query.getDimensionSpec()), + cursor ); - if (dimSelector == null) { + + if (selectorPlusArray[0].getSelector() == null) { return null; } TopNParams params = null; try { - params = topNAlgorithm.makeInitParams(dimSelector, cursor); + params = topNAlgorithm.makeInitParams(selectorPlusArray[0], cursor); TopNResultBuilder resultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, query); diff --git a/processing/src/main/java/io/druid/query/topn/TopNParams.java b/processing/src/main/java/io/druid/query/topn/TopNParams.java index d9e75a82c08..c973267fe34 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNParams.java +++ b/processing/src/main/java/io/druid/query/topn/TopNParams.java @@ -19,6 +19,8 @@ package io.druid.query.topn; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.topn.types.TopNColumnSelectorStrategy; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; @@ -26,20 +28,20 @@ import io.druid.segment.DimensionSelector; */ public class TopNParams { - private final DimensionSelector dimSelector; private final Cursor cursor; private final int cardinality; private final int numValuesPerPass; + private final ColumnSelectorPlus selectorPlus; protected TopNParams( - DimensionSelector dimSelector, + ColumnSelectorPlus selectorPlus, Cursor cursor, int numValuesPerPass ) { - this.dimSelector = dimSelector; + this.selectorPlus = selectorPlus; this.cursor = cursor; - this.cardinality = dimSelector.getValueCardinality(); + this.cardinality = selectorPlus.getColumnSelectorStrategy().getCardinality(selectorPlus.getSelector()); this.numValuesPerPass = numValuesPerPass; if (cardinality < 0) { @@ -47,9 +49,16 @@ public class TopNParams } } + // Only used by TopN algorithms that support String exclusively + // Otherwise, get an appropriately typed selector from getSelectorPlus() public DimensionSelector getDimSelector() { - return dimSelector; + return (DimensionSelector) selectorPlus.getSelector(); + } + + public ColumnSelectorPlus getSelectorPlus() + { + return selectorPlus; } public Cursor getCursor() diff --git a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java index bb4e121f6e6..d8aef7b5465 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java +++ b/processing/src/main/java/io/druid/query/topn/TopNQueryEngine.java @@ -94,7 +94,6 @@ public class TopNQueryEngine { final Capabilities capabilities = adapter.getCapabilities(); final String dimension = query.getDimensionSpec().getDimension(); - final int cardinality = adapter.getDimensionCardinality(dimension); int numBytesPerRecord = 0; diff --git a/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java new file mode 100644 index 00000000000..aa25e03b775 --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/types/StringTopNColumnSelectorStrategy.java @@ -0,0 +1,69 @@ +package io.druid.query.topn.types; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.topn.BaseTopNAlgorithm; +import io.druid.query.topn.TopNParams; +import io.druid.query.topn.TopNQuery; +import io.druid.segment.Capabilities; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.data.IndexedInts; + +import java.util.Map; + +public class StringTopNColumnSelectorStrategy implements TopNColumnSelectorStrategy +{ + @Override + public int getCardinality(DimensionSelector selector) + { + return selector.getValueCardinality(); + } + + @Override + public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, Capabilities capabilities) + { + // This method is used for the DimExtractionTopNAlgorithm only. + // Unlike regular topN we cannot rely on ordering to optimize. + // Optimization possibly requires a reverse lookup from value to ID, which is + // not possible when applying an extraction function + + final BaseTopNAlgorithm.AggregatorArrayProvider provider = new BaseTopNAlgorithm.AggregatorArrayProvider( + (DimensionSelector) params.getSelectorPlus().getSelector(), + query, + params.getCardinality(), + capabilities + ); + + return provider.build(); + } + + @Override + public void dimExtractionScanAndAggregate( + final TopNQuery query, + DimensionSelector selector, + Cursor cursor, + Aggregator[][] rowSelector, + Map aggregatesStore + ) + { + final IndexedInts dimValues = selector.getRow(); + + for (int i = 0; i < dimValues.size(); ++i) { + final int dimIndex = dimValues.get(i); + Aggregator[] theAggregators = rowSelector[dimIndex]; + if (theAggregators == null) { + final String key = selector.lookupName(dimIndex); + theAggregators = aggregatesStore.get(key); + if (theAggregators == null) { + theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs()); + aggregatesStore.put(key, theAggregators); + } + rowSelector[dimIndex] = theAggregators; + } + + for (Aggregator aggregator : theAggregators) { + aggregator.aggregate(); + } + } + } +} diff --git a/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java new file mode 100644 index 00000000000..01e293bc4ee --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/types/TopNColumnSelectorStrategy.java @@ -0,0 +1,82 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn.types; + +import io.druid.query.aggregation.Aggregator; +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.query.topn.TopNParams; +import io.druid.query.topn.TopNQuery; +import io.druid.segment.Capabilities; +import io.druid.segment.ColumnValueSelector; +import io.druid.segment.Cursor; + +import java.util.Map; + +public interface TopNColumnSelectorStrategy extends ColumnSelectorStrategy +{ + int getCardinality(ValueSelectorType selector); + + /** + * Used by DimExtractionTopNAlgorithm. + * + * Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters. + * + * As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types + * that use integer row values. + * + * A dimension type that does not have integer values should return null. + * + * @param query The TopN query being served + * @param params Parameters for the TopN query being served + * @param capabilities Object indicating if dimension values are sorted + * @return an Aggregator[][] for integer-valued dimensions, null otherwise + */ + Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, Capabilities capabilities); + + + /** + * Used by DimExtractionTopNAlgorithm. + * + * Read the current row from a dimension value selector, and for each row value: + * 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup) or from + * aggregatesStore (slower map). + * + * 2. If the rowSelector and/or aggregatesStore did not have an entry for a particular row value, + * this function should retrieve the current Aggregator[] using BaseTopNAlgorithm.makeAggregators() and the + * provided cursor and query, storing them in rowSelector and aggregatesStore + * + * 3. Call aggregate() on each of the aggregators. + * + * If a dimension type doesn't have integer values, it should ignore rowSelector and use the aggregatesStore map only. + * + * @param query The TopN query being served. + * @param selector Dimension value selector + * @param cursor Cursor for the segment being queried + * @param rowSelector Integer lookup containing aggregators + * @param aggregatesStore Map containing aggregators + */ + void dimExtractionScanAndAggregate( + final TopNQuery query, + ValueSelectorType selector, + Cursor cursor, + Aggregator[][] rowSelector, + Map aggregatesStore + ); +} diff --git a/processing/src/main/java/io/druid/query/topn/types/TopNStrategyFactory.java b/processing/src/main/java/io/druid/query/topn/types/TopNStrategyFactory.java new file mode 100644 index 00000000000..0a141b0aa4f --- /dev/null +++ b/processing/src/main/java/io/druid/query/topn/types/TopNStrategyFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.topn.types; + +import io.druid.java.util.common.IAE; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ValueType; + +public class TopNStrategyFactory implements ColumnSelectorStrategyFactory +{ + @Override + public TopNColumnSelectorStrategy makeColumnSelectorStrategy( + ColumnCapabilities capabilities + ) + { + ValueType type = capabilities.getType(); + switch(type) { + case STRING: + return new StringTopNColumnSelectorStrategy(); + default: + throw new IAE("Cannot create query type helper from invalid type [%s]", type); + } + } +} diff --git a/processing/src/main/java/io/druid/segment/ColumnValueSelector.java b/processing/src/main/java/io/druid/segment/ColumnValueSelector.java new file mode 100644 index 00000000000..d73ac68affe --- /dev/null +++ b/processing/src/main/java/io/druid/segment/ColumnValueSelector.java @@ -0,0 +1,27 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +/** + * Base type for interfaces that manage column value selection, e.g. DimensionSelector, LongColumnSelector + */ +public interface ColumnValueSelector +{ +} diff --git a/processing/src/main/java/io/druid/segment/DimensionHandler.java b/processing/src/main/java/io/druid/segment/DimensionHandler.java index 411d76aaaaa..8f2e791cf8e 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandler.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandler.java @@ -26,6 +26,7 @@ import io.druid.segment.data.Indexed; import java.io.Closeable; import java.io.File; +import java.io.IOException; /** * Processing related interface @@ -61,7 +62,7 @@ public interface DimensionHandler, E * * @return Dimension name */ - public String getDimensionName(); + String getDimensionName(); /** @@ -70,7 +71,7 @@ public interface DimensionHandler, E * * @return A new DimensionIndexer object. */ - public DimensionIndexer makeIndexer(); + DimensionIndexer makeIndexer(); /** @@ -87,13 +88,13 @@ public interface DimensionHandler, E * @return A new DimensionMergerV9 object. */ - public DimensionMergerV9 makeMerger( + DimensionMergerV9 makeMerger( IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress - ); + ) throws IOException; /** @@ -110,14 +111,13 @@ public interface DimensionHandler, E * @return A new DimensionMergerLegacy object. */ - public DimensionMergerLegacy makeLegacyMerger( + DimensionMergerLegacy makeLegacyMerger( IndexSpec indexSpec, File outDir, IOPeon ioPeon, ColumnCapabilities capabilities, ProgressIndicator progress - ); - + ) throws IOException; /** * Given an array representing a single set of row value(s) for this dimension as an Object, @@ -128,7 +128,7 @@ public interface DimensionHandler, E * @param dimVals Array of row values * @return Size of dimVals */ - public int getLengthFromEncodedArray(EncodedTypeArray dimVals); + int getLengthFromEncodedArray(EncodedTypeArray dimVals); /** @@ -143,7 +143,7 @@ public interface DimensionHandler, E * * @return integer indicating comparison result of arrays */ - public int compareSortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs); + int compareSortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs); /** @@ -164,7 +164,7 @@ public interface DimensionHandler, E * * @return integer indicating comparison result of arrays */ - public void validateSortedEncodedArrays( + void validateSortedEncodedArrays( EncodedTypeArray lhs, EncodedTypeArray rhs, Indexed lhsEncodings, @@ -182,7 +182,7 @@ public interface DimensionHandler, E * @param column Column for this dimension from a QueryableIndex * @return The type-specific column subobject for this dimension. */ - public Closeable getSubColumn(Column column); + Closeable getSubColumn(Column column); /** @@ -196,5 +196,5 @@ public interface DimensionHandler, E * @param currRow The index of the row to retrieve * @return The row from "column" specified by "currRow", as an array of values */ - public Object getRowValueArrayFromColumn(Closeable column, int currRow); + Object getRowValueArrayFromColumn(Closeable column, int currRow); } diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtil.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtil.java deleted file mode 100644 index 87376e149ff..00000000000 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtil.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.segment; - -import io.druid.java.util.common.IAE; -import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; -import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ValueType; - -public final class DimensionHandlerUtil -{ - private DimensionHandlerUtil() {} - - public static DimensionHandler getHandlerFromCapabilities( - String dimensionName, - ColumnCapabilities capabilities, - MultiValueHandling multiValueHandling - ) - { - DimensionHandler handler = null; - if (capabilities.getType() == ValueType.STRING) { - if (!capabilities.isDictionaryEncoded() || !capabilities.hasBitmapIndexes()) { - throw new IAE("String column must have dictionary encoding and bitmap index."); - } - // use default behavior - multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling; - handler = new StringDimensionHandler(dimensionName, multiValueHandling); - } - if (handler == null) { - throw new IAE("Could not create handler from invalid column type: " + capabilities.getType()); - } - return handler; - } -} diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java new file mode 100644 index 00000000000..2eb773126c4 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -0,0 +1,159 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment; + +import io.druid.java.util.common.IAE; +import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.dimension.ColumnSelectorStrategy; +import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ColumnCapabilitiesImpl; +import io.druid.segment.column.ValueType; + +import java.util.List; + +public final class DimensionHandlerUtils +{ + private DimensionHandlerUtils() {} + + public final static ColumnCapabilities DEFAULT_STRING_CAPABILITIES = + new ColumnCapabilitiesImpl().setType(ValueType.STRING) + .setDictionaryEncoded(true) + .setHasBitmapIndexes(true); + + public static DimensionHandler getHandlerFromCapabilities( + String dimensionName, + ColumnCapabilities capabilities, + MultiValueHandling multiValueHandling + ) + { + if (capabilities == null) { + return new StringDimensionHandler(dimensionName, multiValueHandling); + } + + multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling; + + if (capabilities.getType() == ValueType.STRING) { + if (!capabilities.isDictionaryEncoded() || !capabilities.hasBitmapIndexes()) { + throw new IAE("String column must have dictionary encoding and bitmap index."); + } + return new StringDimensionHandler(dimensionName, multiValueHandling); + } + + // Return a StringDimensionHandler by default (null columns will be treated as String typed) + return new StringDimensionHandler(dimensionName, multiValueHandling); + } + + /** + * Creates an array of ColumnSelectorPlus objects, selectors that handle type-specific operations within + * query processing engines, using a strategy factory provided by the query engine. One ColumnSelectorPlus + * will be created for each column specified in dimensionSpecs. + * + * The ColumnSelectorPlus provides access to a type strategy (e.g., how to group on a float column) + * and a value selector for a single column. + * + * A caller should define a strategy factory that provides an interface for type-specific operations + * in a query engine. See GroupByStrategyFactory for a reference. + * + * @param The strategy type created by the provided strategy factory. + * @param strategyFactory A factory provided by query engines that generates type-handling strategies + * @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for + * @param cursor Used to create value selectors for columns. + * @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs + */ + public static ColumnSelectorPlus[] createColumnSelectorPluses( + ColumnSelectorStrategyFactory strategyFactory, + List dimensionSpecs, + ColumnSelectorFactory cursor + ) + { + int dimCount = dimensionSpecs.size(); + ColumnSelectorPlus[] dims = new ColumnSelectorPlus[dimCount]; + for (int i = 0; i < dimCount; i++) { + final DimensionSpec dimSpec = dimensionSpecs.get(i); + final String dimName = dimSpec.getDimension(); + ColumnSelectorStrategyClass strategy = makeStrategy( + strategyFactory, + dimName, + cursor.getColumnCapabilities(dimSpec.getDimension()) + ); + final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec( + dimSpec, + cursor + ); + final ColumnSelectorPlus selectorPlus = new ColumnSelectorPlus<>( + dimName, + dimSpec.getOutputName(), + strategy, + selector + ); + dims[i] = selectorPlus; + } + return dims; + } + + // When determining the capabilites of a column during query processing, this function + // adjusts the capabilities for columns that cannot be handled as-is to manageable defaults + // (e.g., treating missing columns as empty String columns) + private static ColumnCapabilities getEffectiveCapabilities( + String dimName, + ColumnCapabilities capabilities + ) + { + if (capabilities == null) { + capabilities = DEFAULT_STRING_CAPABILITIES; + } + + // non-Strings aren't actually supported yet + if (capabilities.getType() != ValueType.STRING) { + capabilities = DEFAULT_STRING_CAPABILITIES; + } + + return capabilities; + } + + private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec( + DimensionSpec dimSpec, + ColumnSelectorFactory columnSelectorFactory + ) + { + String dimName = dimSpec.getDimension(); + ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName); + capabilities = getEffectiveCapabilities(dimName, capabilities); + switch (capabilities.getType()) { + case STRING: + return columnSelectorFactory.makeDimensionSelector(dimSpec); + default: + return null; + } + } + + private static ColumnSelectorStrategyClass makeStrategy( + ColumnSelectorStrategyFactory strategyFactory, + String dimName, + ColumnCapabilities capabilities + ) + { + capabilities = getEffectiveCapabilities(dimName, capabilities); + return strategyFactory.makeColumnSelectorStrategy(capabilities); + } +} diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index ed41d714524..50ad5e65340 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -119,7 +119,7 @@ public interface DimensionIndexer, E * * @return An array containing an encoded representation of the input row value. */ - public EncodedTypeArray processRowValsToUnsortedEncodedArray(Object dimValues); + EncodedTypeArray processRowValsToUnsortedEncodedArray(Object dimValues); /** @@ -132,7 +132,7 @@ public interface DimensionIndexer, E * @param unsortedIntermediateValue value to convert * @return converted value */ - public EncodedType getSortedEncodedValueFromUnsorted(EncodedType unsortedIntermediateValue); + EncodedType getSortedEncodedValueFromUnsorted(EncodedType unsortedIntermediateValue); /** @@ -145,7 +145,7 @@ public interface DimensionIndexer, E * @param sortedIntermediateValue value to convert * @return converted value */ - public EncodedType getUnsortedEncodedValueFromSorted(EncodedType sortedIntermediateValue); + EncodedType getUnsortedEncodedValueFromSorted(EncodedType sortedIntermediateValue); /** @@ -159,7 +159,7 @@ public interface DimensionIndexer, E * * @return Sorted index of actual values */ - public Indexed getSortedIndexedValues(); + Indexed getSortedIndexedValues(); /** @@ -177,7 +177,7 @@ public interface DimensionIndexer, E * * @return min value */ - public ActualType getMinValue(); + ActualType getMinValue(); /** @@ -185,7 +185,7 @@ public interface DimensionIndexer, E * * @return max value */ - public ActualType getMaxValue(); + ActualType getMaxValue(); /** @@ -193,7 +193,7 @@ public interface DimensionIndexer, E * * @return value cardinality */ - public int getCardinality(); + int getCardinality(); /** @@ -210,7 +210,7 @@ public interface DimensionIndexer, E * @param desc Descriptor object for this dimension within an IncrementalIndex * @return A new object that reads rows from currEntry */ - public Object makeColumnValueSelector( + Object makeColumnValueSelector( DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc @@ -239,7 +239,7 @@ public interface DimensionIndexer, E * @param rhs dimension value array from a TimeAndDims key * @return comparison of the two arrays */ - public int compareUnsortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs); + int compareUnsortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs); /** @@ -249,7 +249,7 @@ public interface DimensionIndexer, E * @param rhs dimension value array from a TimeAndDims key * @return true if the two arrays are equal */ - public boolean checkUnsortedEncodedArraysEqual(EncodedTypeArray lhs, EncodedTypeArray rhs); + boolean checkUnsortedEncodedArraysEqual(EncodedTypeArray lhs, EncodedTypeArray rhs); /** @@ -257,10 +257,10 @@ public interface DimensionIndexer, E * @param key dimension value array from a TimeAndDims key * @return hashcode of the array */ - public int getUnsortedEncodedArrayHashCode(EncodedTypeArray key); + int getUnsortedEncodedArrayHashCode(EncodedTypeArray key); - public static final boolean LIST = true; - public static final boolean ARRAY = false; + boolean LIST = true; + boolean ARRAY = false; /** * Given a row value array from a TimeAndDims key, as described in the documentation for compareUnsortedEncodedArrays(), @@ -273,7 +273,7 @@ public interface DimensionIndexer, E * @param asList if true, return an array; if false, return a list * @return single value, array, or list containing the actual values corresponding to the encoded values in the input array */ - public Object convertUnsortedEncodedArrayToActualArrayOrList(EncodedTypeArray key, boolean asList); + Object convertUnsortedEncodedArrayToActualArrayOrList(EncodedTypeArray key, boolean asList); /** @@ -283,7 +283,7 @@ public interface DimensionIndexer, E * @param key dimension value array from a TimeAndDims key * @return array containing the sorted encoded values corresponding to the unsorted encoded values in the input array */ - public EncodedTypeArray convertUnsortedEncodedArrayToSortedEncodedArray(EncodedTypeArray key); + EncodedTypeArray convertUnsortedEncodedArrayToSortedEncodedArray(EncodedTypeArray key); /** @@ -307,7 +307,7 @@ public interface DimensionIndexer, E * @param bitmapIndexes array of bitmaps, indexed by integer dimension value * @param factory bitmap factory */ - public void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory); + void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory); /** @@ -326,8 +326,7 @@ public interface DimensionIndexer, E * @param dimIndex the array index of this indexer's dimension within the TimeAndDims key * @return A ValueMatcher that matches a dimension value array from a TimeAndDims key against "matchValue" */ - public ValueMatcher makeIndexingValueMatcher(String matchValue, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex); - + ValueMatcher makeIndexingValueMatcher(String matchValue, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex); /** * Return a ValueMatcher that accepts an EntryHolder containing the current TimeAndDims key and the array index of this @@ -350,5 +349,5 @@ public interface DimensionIndexer, E * @param dimIndex the array index of this indexer's dimension within the TimeAndDims key * @return A ValueMatcher that applies a predicate from the predicateFactory to the dimension values in the TimeAndDim keys */ - public ValueMatcher makeIndexingValueMatcher(DruidPredicateFactory predicateFactory, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex); + ValueMatcher makeIndexingValueMatcher(DruidPredicateFactory predicateFactory, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex); } diff --git a/processing/src/main/java/io/druid/segment/DimensionMerger.java b/processing/src/main/java/io/druid/segment/DimensionMerger.java index 4dbf406abe7..d22cc5c0a05 100644 --- a/processing/src/main/java/io/druid/segment/DimensionMerger.java +++ b/processing/src/main/java/io/druid/segment/DimensionMerger.java @@ -68,7 +68,7 @@ public interface DimensionMerger * @param adapters List of adapters to be merged. * @throws IOException */ - public void writeMergedValueMetadata(List adapters) throws IOException; + void writeMergedValueMetadata(List adapters) throws IOException; /** @@ -86,7 +86,7 @@ public interface DimensionMerger * @param segmentRow A row from a segment to be converted to its representation within the merged sequence of rows. * @param segmentIndexNumber Integer indicating which segment the row originated from. */ - public EncodedTypedArray convertSegmentRowValuesToMergedRowValues(EncodedTypedArray segmentRow, int segmentIndexNumber); + EncodedTypedArray convertSegmentRowValuesToMergedRowValues(EncodedTypedArray segmentRow, int segmentIndexNumber); /** @@ -101,7 +101,7 @@ public interface DimensionMerger * @param rowValues The row values to be added. * @throws IOException */ - public void processMergedRow(EncodedTypedArray rowValues) throws IOException; + void processMergedRow(EncodedTypedArray rowValues) throws IOException; /** @@ -125,7 +125,7 @@ public interface DimensionMerger * @param closer Add Closeables for resource cleanup to this Closer if needed * @throws IOException */ - public void writeIndexes(List segmentRowNumConversions, Closer closer) throws IOException; + void writeIndexes(List segmentRowNumConversions, Closer closer) throws IOException; /** @@ -135,5 +135,5 @@ public interface DimensionMerger * * @return true if this dimension can be excluded from the merged segment. */ - public boolean canSkip(); + boolean canSkip(); } diff --git a/processing/src/main/java/io/druid/segment/DimensionMergerLegacy.java b/processing/src/main/java/io/druid/segment/DimensionMergerLegacy.java index 360eb8672b2..0da997642fe 100644 --- a/processing/src/main/java/io/druid/segment/DimensionMergerLegacy.java +++ b/processing/src/main/java/io/druid/segment/DimensionMergerLegacy.java @@ -23,6 +23,7 @@ import com.google.common.io.ByteSink; import com.google.common.io.OutputSupplier; import io.druid.common.guava.FileOutputSupplier; +import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -41,7 +42,7 @@ public interface DimensionMergerLegacy extends DimensionMerger * @param valueEncodingFile Destination file * @throws IOException */ - public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException; + void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException; /** @@ -49,7 +50,7 @@ public interface DimensionMergerLegacy extends DimensionMerger * @param rowValueOut Destination file * @throws IOException */ - public void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException; + void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException; /** @@ -58,8 +59,11 @@ public interface DimensionMergerLegacy extends DimensionMerger * @param spatialOut Destination file for spatial indexes * @throws IOException */ - public void writeIndexesToFiles( + void writeIndexesToFiles( ByteSink invertedOut, OutputSupplier spatialOut ) throws IOException; + + + File makeDimFile() throws IOException; } diff --git a/processing/src/main/java/io/druid/segment/DimensionMergerV9.java b/processing/src/main/java/io/druid/segment/DimensionMergerV9.java index c344c84c540..c95a757ee51 100644 --- a/processing/src/main/java/io/druid/segment/DimensionMergerV9.java +++ b/processing/src/main/java/io/druid/segment/DimensionMergerV9.java @@ -21,6 +21,8 @@ package io.druid.segment; import io.druid.segment.column.ColumnDescriptor; +import java.io.IOException; + /** * Processing related interface * @@ -34,5 +36,5 @@ public interface DimensionMergerV9 extends DimensionMerger +public interface ObjectColumnSelector extends ColumnValueSelector { public Class classOfObject(); public T get(); diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 9e3a04a017c..005bf92c90a 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -20,9 +20,7 @@ package io.druid.segment; import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.base.Predicates; -import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -36,6 +34,7 @@ import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.math.expr.Expr; import io.druid.math.expr.Parser; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.QueryInterruptedException; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -46,6 +45,8 @@ import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.RowOffsetMatcherFactory; import io.druid.query.filter.ValueMatcher; +import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; +import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; import io.druid.query.filter.ValueMatcherFactory; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; @@ -303,7 +304,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return Sequences.filter( new CursorSequenceBuilder( - index, + this, actualInterval, virtualColumns, gran, @@ -329,7 +330,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private static class CursorSequenceBuilder { - private final ColumnSelector index; + private final StorageAdapter storageAdapter; + private final QueryableIndex index; private final Interval interval; private final VirtualColumns virtualColumns; private final QueryGranularity gran; @@ -341,7 +343,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private final ColumnSelectorBitmapIndexSelector bitmapIndexSelector; public CursorSequenceBuilder( - ColumnSelector index, + QueryableIndexStorageAdapter storageAdapter, Interval interval, VirtualColumns virtualColumns, QueryGranularity gran, @@ -353,7 +355,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter ColumnSelectorBitmapIndexSelector bitmapIndexSelector ) { - this.index = index; + this.storageAdapter = storageAdapter; + this.index = storageAdapter.index; this.interval = interval; this.virtualColumns = virtualColumns; this.gran = gran; @@ -925,7 +928,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter return new QueryableIndexBaseCursor() { CursorOffsetHolderValueMatcherFactory valueMatcherFactory = new CursorOffsetHolderValueMatcherFactory( - index, + storageAdapter, this ); RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory( @@ -1039,98 +1042,64 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private static class CursorOffsetHolderValueMatcherFactory implements ValueMatcherFactory { - private final ColumnSelector index; + private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY = + new ValueMatcherColumnSelectorStrategyFactory(); + + private final StorageAdapter storageAdapter; private final ColumnSelectorFactory cursor; + private final List availableMetrics; public CursorOffsetHolderValueMatcherFactory( - ColumnSelector index, + StorageAdapter storageAdapter, ColumnSelectorFactory cursor ) { - this.index = index; + this.storageAdapter = storageAdapter; this.cursor = cursor; + this.availableMetrics = Lists.newArrayList(storageAdapter.getAvailableMetrics()); } @Override public ValueMatcher makeValueMatcher(String dimension, final String value) { - if (getTypeForDimension(dimension) == ValueType.LONG) { - return Filters.getLongValueMatcher( - cursor.makeLongColumnSelector(dimension), - value - ); + if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) { + if (getTypeForDimension(dimension) == ValueType.LONG) { + return Filters.getLongValueMatcher( + cursor.makeLongColumnSelector(dimension), + value + ); + } } - final DimensionSelector selector = cursor.makeDimensionSelector( - new DefaultDimensionSpec(dimension, dimension) - ); + ColumnSelectorPlus[] selector = + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + ImmutableList.of(DefaultDimensionSpec.of(dimension)), + cursor + ); - // if matching against null, rows with size 0 should also match - final boolean matchNull = Strings.isNullOrEmpty(value); - - final int id = selector.lookupId(value); - if (id < 0) { - return new BooleanValueMatcher(false); - } else { - return new ValueMatcher() - { - @Override - public boolean matches() - { - IndexedInts row = selector.getRow(); - if (row.size() == 0) { - return matchNull; - } - for (int i = 0; i < row.size(); i++) { - if (row.get(i) == id) { - return true; - } - } - return false; - } - }; - } + final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); + return strategy.getValueMatcher(dimension, cursor, value); } @Override public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory) { - ValueType type = getTypeForDimension(dimension); - switch (type) { - case LONG: + if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) { + if (getTypeForDimension(dimension) == ValueType.LONG) { return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate()); - case STRING: - return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate()); - default: - return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null)); - } - } - - private ValueMatcher makeStringValueMatcher(String dimension, final Predicate predicate) - { - final DimensionSelector selector = cursor.makeDimensionSelector( - new DefaultDimensionSpec(dimension, dimension) - ); - - return new ValueMatcher() - { - final boolean matchNull = predicate.apply(null); - - @Override - public boolean matches() - { - IndexedInts row = selector.getRow(); - if (row.size() == 0) { - return matchNull; - } - for (int i = 0; i < row.size(); i++) { - if (predicate.apply(selector.lookupName(row.get(i)))) { - return true; - } - } - return false; } - }; + } + + ColumnSelectorPlus[] selector = + DimensionHandlerUtils.createColumnSelectorPluses( + STRATEGY_FACTORY, + ImmutableList.of(DefaultDimensionSpec.of(dimension)), + cursor + ); + + final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); + return strategy.getValueMatcher(dimension, cursor, predicateFactory); } private ValueMatcher makeLongValueMatcher(String dimension, final DruidLongPredicate predicate) @@ -1143,7 +1112,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter private ValueType getTypeForDimension(String dimension) { - ColumnCapabilities capabilities = getColumnCapabilites(index, dimension); + ColumnCapabilities capabilities = cursor.getColumnCapabilities(dimension); return capabilities == null ? ValueType.STRING : capabilities.getType(); } } diff --git a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java index f91fa7aa029..699f0530f5c 100644 --- a/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/SimpleQueryableIndex.java @@ -124,7 +124,7 @@ public class SimpleQueryableIndex implements QueryableIndex { for (String dim : availableDimensions) { ColumnCapabilities capabilities = getColumn(dim).getCapabilities(); - DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities, null); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); dimensionHandlers.put(dim, handler); } } diff --git a/processing/src/main/java/io/druid/segment/StringDimensionHandler.java b/processing/src/main/java/io/druid/segment/StringDimensionHandler.java index 6be0802408f..b45f95edec2 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionHandler.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionHandler.java @@ -32,7 +32,6 @@ import java.io.Closeable; import java.io.File; import java.lang.reflect.Array; import java.util.Arrays; -import java.util.Comparator; public class StringDimensionHandler implements DimensionHandler { @@ -214,20 +213,4 @@ public class StringDimensionHandler implements DimensionHandler ENCODED_COMPARATOR = new Comparator() - { - @Override - public int compare(Integer o1, Integer o2) - { - if (o1 == null) { - return o2 == null ? 0 : -1; - } - if (o2 == null) { - return 1; - } - return o1.compareTo(o2); - } - }; - } diff --git a/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java b/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java index e55e999da80..d4c4616a366 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionMergerLegacy.java @@ -213,4 +213,12 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme spatialIoPeon.cleanup(); } } + + @Override + public File makeDimFile() throws IOException + { + return IndexIO.makeDimFile(outDir, dimensionName); + } } + + diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 6ec6db6d5f9..0b5210febfa 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -49,7 +49,7 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionHandler; -import io.druid.segment.DimensionHandlerUtil; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.DimensionIndexer; import io.druid.segment.DimensionSelector; import io.druid.segment.FloatColumnSelector; @@ -206,8 +206,8 @@ public abstract class IncrementalIndex implements Iterable, // This ColumnSelectorFactory implementation has no knowledge of column capabilities. // However, this method may still be called by FilteredAggregatorFactory's ValueMatcherFactory // to check column types. - // Just return null, the caller will assume default types in that case. - return null; + // If column capabilities are not available, return null, the caller will assume default types in that case. + return columnCapabilities == null ? null : columnCapabilities.get(columnName); } @Override @@ -407,7 +407,7 @@ public abstract class IncrementalIndex implements Iterable, if (dimSchema.getTypeName().equals(DimensionSchema.SPATIAL_TYPE_NAME)) { capabilities.setHasSpatialIndexes(true); } else { - DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities( + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities( dimName, capabilities, dimSchema.getMultiValueHandling() @@ -567,7 +567,7 @@ public abstract class IncrementalIndex implements Iterable, capabilities.setHasBitmapIndexes(true); columnCapabilities.put(dimension, capabilities); } - DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dimension, capabilities, null); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null); desc = addNewDimension(dimension, capabilities, handler); } DimensionHandler handler = desc.getHandler(); @@ -747,7 +747,7 @@ public abstract class IncrementalIndex implements Iterable, if (dimensionDescs.get(dim) == null) { ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim); columnCapabilities.put(dim, capabilities); - DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities, null); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); addNewDimension(dim, capabilities, handler); } } diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index d4cdfc016d2..5f6e9d3758e 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -139,6 +139,7 @@ public class MultiValuedDimensionTest "2011-01-12T00:00:00.000Z,product_1,t1\tt2\tt3", "2011-01-13T00:00:00.000Z,product_2,t3\tt4\tt5", "2011-01-14T00:00:00.000Z,product_3,t5\tt6\tt7", + "2011-01-14T00:00:00.000Z,product_4" }; for (String row : rows) { @@ -180,6 +181,7 @@ public class MultiValuedDimensionTest ); List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 2L), GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t1", "count", 2L), GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t2", "count", 2L), GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t3", "count", 4L), diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java index b57bf2e3247..2687003d0ea 100644 --- a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java +++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorBenchmark.java @@ -28,6 +28,9 @@ import com.google.common.collect.DiscreteDomain; import com.google.common.collect.FluentIterable; import com.google.common.collect.Lists; import com.google.common.collect.Range; +import io.druid.query.ColumnSelectorPlus; +import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; +import io.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.segment.DimensionSelector; @@ -41,6 +44,7 @@ public class CardinalityAggregatorBenchmark extends SimpleBenchmark CardinalityBufferAggregator agg; List selectorList; + List> dimInfoList; ByteBuffer buf; int pos; @@ -75,16 +79,24 @@ public class CardinalityAggregatorBenchmark extends SimpleBenchmark .cycle() .limit(MAX); - + final DimensionSpec dimSpec1 = new DefaultDimensionSpec("dim1", "dim1"); final CardinalityAggregatorTest.TestDimensionSelector dim1 = new CardinalityAggregatorTest.TestDimensionSelector(values, null); + final ColumnSelectorPlus dimInfo1 = new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), + dim1 + ); selectorList = Lists.newArrayList( (DimensionSelector) dim1 ); + dimInfoList = Lists.newArrayList(dimInfo1); + agg = new CardinalityBufferAggregator( - selectorList, + dimInfoList, byRow ); diff --git a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java index a6c7f00033f..aa8560513f4 100644 --- a/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/cardinality/CardinalityAggregatorTest.java @@ -28,9 +28,12 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.druid.jackson.DefaultObjectMapper; import io.druid.js.JavaScriptConfig; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; +import io.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.ExtractionDimensionSpec; @@ -244,25 +247,44 @@ public class CardinalityAggregatorTest } } + List> dimInfoList; List selectorList; CardinalityAggregatorFactory rowAggregatorFactory; CardinalityAggregatorFactory valueAggregatorFactory; final TestDimensionSelector dim1; final TestDimensionSelector dim2; + List> dimInfoListWithExtraction; List selectorListWithExtraction; final TestDimensionSelector dim1WithExtraction; final TestDimensionSelector dim2WithExtraction; + List> dimInfoListConstantVal; List selectorListConstantVal; final TestDimensionSelector dim1ConstantVal; final TestDimensionSelector dim2ConstantVal; + final DimensionSpec dimSpec1 = new DefaultDimensionSpec("dim1", "dim1"); + final DimensionSpec dimSpec2 = new DefaultDimensionSpec("dim2", "dim2"); + public CardinalityAggregatorTest() { dim1 = new TestDimensionSelector(values1, null); dim2 = new TestDimensionSelector(values2, null); + dimInfoList = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim1 + ), + new ColumnSelectorPlus( + dimSpec2.getDimension(), + dimSpec2.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim2 + ) + ); + selectorList = Lists.newArrayList( (DimensionSelector) dim1, dim2 @@ -271,8 +293,8 @@ public class CardinalityAggregatorTest rowAggregatorFactory = new CardinalityAggregatorFactory( "billy", Lists.newArrayList( - new DefaultDimensionSpec("dim1", "dim1"), - new DefaultDimensionSpec("dim2", "dim2") + dimSpec1, + dimSpec2 ), true ); @@ -280,8 +302,8 @@ public class CardinalityAggregatorTest valueAggregatorFactory = new CardinalityAggregatorFactory( "billy", Lists.newArrayList( - new DefaultDimensionSpec("dim1", "dim1"), - new DefaultDimensionSpec("dim2", "dim2") + dimSpec1, + dimSpec2 ), false ); @@ -295,6 +317,18 @@ public class CardinalityAggregatorTest (DimensionSelector) dim1WithExtraction, dim2WithExtraction ); + dimInfoListWithExtraction = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim1WithExtraction + ), + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim2WithExtraction + ) + ); String helloJsFn = "function(str) { return 'hello' }"; ExtractionFn helloFn = new JavaScriptExtractionFn(helloJsFn, false, JavaScriptConfig.getDefault()); @@ -304,13 +338,27 @@ public class CardinalityAggregatorTest (DimensionSelector) dim1ConstantVal, dim2ConstantVal ); + dimInfoListConstantVal = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim1ConstantVal + ), + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim2ConstantVal + ) + ); + } @Test public void testAggregateRows() throws Exception { CardinalityAggregator agg = new CardinalityAggregator( - selectorList, + "billy", + dimInfoList, true ); @@ -325,7 +373,8 @@ public class CardinalityAggregatorTest public void testAggregateValues() throws Exception { CardinalityAggregator agg = new CardinalityAggregator( - selectorList, + "billy", + dimInfoList, false ); @@ -339,7 +388,7 @@ public class CardinalityAggregatorTest public void testBufferAggregateRows() throws Exception { CardinalityBufferAggregator agg = new CardinalityBufferAggregator( - selectorList, + dimInfoList, true ); @@ -360,7 +409,7 @@ public class CardinalityAggregatorTest public void testBufferAggregateValues() throws Exception { CardinalityBufferAggregator agg = new CardinalityBufferAggregator( - selectorList, + dimInfoList, false ); @@ -382,9 +431,23 @@ public class CardinalityAggregatorTest { List selector1 = Lists.newArrayList((DimensionSelector) dim1); List selector2 = Lists.newArrayList((DimensionSelector) dim2); + List> dimInfo1 = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim1 + ) + ); + List> dimInfo2 = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim2 + ) + ); - CardinalityAggregator agg1 = new CardinalityAggregator(selector1, true); - CardinalityAggregator agg2 = new CardinalityAggregator(selector2, true); + CardinalityAggregator agg1 = new CardinalityAggregator("billy", dimInfo1, true); + CardinalityAggregator agg2 = new CardinalityAggregator("billy", dimInfo2, true); for (int i = 0; i < values1.size(); ++i) { aggregate(selector1, agg1); @@ -414,8 +477,23 @@ public class CardinalityAggregatorTest List selector1 = Lists.newArrayList((DimensionSelector) dim1); List selector2 = Lists.newArrayList((DimensionSelector) dim2); - CardinalityAggregator agg1 = new CardinalityAggregator(selector1, false); - CardinalityAggregator agg2 = new CardinalityAggregator(selector2, false); + List> dimInfo1 = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim1 + ) + ); + List> dimInfo2 = Lists.newArrayList( + new ColumnSelectorPlus( + dimSpec1.getDimension(), + dimSpec1.getOutputName(), + new StringCardinalityAggregatorColumnSelectorStrategy(), dim2 + ) + ); + + CardinalityAggregator agg1 = new CardinalityAggregator("billy", dimInfo1, false); + CardinalityAggregator agg2 = new CardinalityAggregator("billy", dimInfo2, false); for (int i = 0; i < values1.size(); ++i) { aggregate(selector1, agg1); @@ -443,7 +521,8 @@ public class CardinalityAggregatorTest public void testAggregateRowsWithExtraction() throws Exception { CardinalityAggregator agg = new CardinalityAggregator( - selectorListWithExtraction, + "billy", + dimInfoListWithExtraction, true ); for (int i = 0; i < values1.size(); ++i) { @@ -452,7 +531,8 @@ public class CardinalityAggregatorTest Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05); CardinalityAggregator agg2 = new CardinalityAggregator( - selectorListConstantVal, + "billy", + dimInfoListConstantVal, true ); for (int i = 0; i < values1.size(); ++i) { @@ -465,7 +545,8 @@ public class CardinalityAggregatorTest public void testAggregateValuesWithExtraction() throws Exception { CardinalityAggregator agg = new CardinalityAggregator( - selectorListWithExtraction, + "billy", + dimInfoListWithExtraction, false ); for (int i = 0; i < values1.size(); ++i) { @@ -474,7 +555,8 @@ public class CardinalityAggregatorTest Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get()), 0.05); CardinalityAggregator agg2 = new CardinalityAggregator( - selectorListConstantVal, + "billy", + dimInfoListConstantVal, false ); for (int i = 0; i < values1.size(); ++i) { diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 9949041c27f..5cdb371e684 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -794,6 +794,7 @@ public class GroupByQueryRunnerTest ); Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + List res = Lists.newArrayList(results); TestHelper.assertExpectedObjects(expectedResults, results, ""); } @@ -6877,4 +6878,43 @@ public class GroupByQueryRunnerTest Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); TestHelper.assertExpectedObjects(expectedResults, results, ""); } + + @Test + public void testGroupByCardinalityAggOnFloat() + { + GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.dataSource) + .setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird) + .setDimensions(Lists.newArrayList(new DefaultDimensionSpec("market", "alias"))) + .setAggregatorSpecs( + Arrays.asList( + QueryRunnerTestHelper.rowsCount, + new CardinalityAggregatorFactory( + "numVals", + ImmutableList.of(new DefaultDimensionSpec( + QueryRunnerTestHelper.indexMetric, + QueryRunnerTestHelper.indexMetric + )), + false + ) + ) + ) + .setGranularity(QueryRunnerTestHelper.dayGran) + .build(); + + // CardinalityAggregator currently treats non-String columns as having all nulls, so cardinality is 1 for + // the 'index' column + List expectedResults = Arrays.asList( + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "spot", "rows", 9L, "numVals", 1.0002442201269182d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "total_market", "rows", 2L, "numVals", 1.0002442201269182d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "upfront", "rows", 2L, "numVals", 1.0002442201269182d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "spot", "rows", 9L, "numVals", 1.0002442201269182d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "total_market", "rows", 2L, "numVals", 1.0002442201269182d), + GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "upfront", "rows", 2L, "numVals", 1.0002442201269182d) + ); + + Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query); + TestHelper.assertExpectedObjects(expectedResults, results, ""); + } } diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 10586137931..6f6b5c38f88 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -31,6 +31,7 @@ import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; import io.druid.query.dimension.ExtractionDimensionSpec; import io.druid.query.extraction.MapLookupExtractor; +import io.druid.query.extraction.TimeFormatExtractionFn; import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.DimFilter; import io.druid.query.filter.ExtractionDimFilter; @@ -44,6 +45,7 @@ import io.druid.query.search.search.SearchQueryConfig; import io.druid.query.search.search.SearchSortSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.segment.TestHelper; +import io.druid.segment.column.Column; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -602,6 +604,33 @@ public class SearchQueryRunnerTest checkSearchQuery(searchQuery, expectedHits); } + @Test + public void testSearchOnTime() + { + SearchQuery searchQuery = Druids.newSearchQueryBuilder() + .dataSource(QueryRunnerTestHelper.dataSource) + .granularity(QueryRunnerTestHelper.allGran) + .intervals(QueryRunnerTestHelper.fullOnInterval) + .query("Friday") + .dimensions(new ExtractionDimensionSpec( + Column.TIME_COLUMN_NAME, + "__time2", + new TimeFormatExtractionFn( + "EEEE", + null, + null, + null, + false + ) + )) + .build(); + + List expectedHits = Lists.newLinkedList(); + expectedHits.add(new SearchHit("__time2", "Friday", 169)); + + checkSearchQuery(searchQuery, expectedHits); + } + private void checkSearchQuery(Query searchQuery, List expectedResults) {