diff --git a/common/src/main/java/io/druid/common/guava/SettableSupplier.java b/common/src/main/java/io/druid/common/guava/SettableSupplier.java new file mode 100644 index 00000000000..01d07a64ef4 --- /dev/null +++ b/common/src/main/java/io/druid/common/guava/SettableSupplier.java @@ -0,0 +1,50 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.common.guava; + +import com.google.common.base.Supplier; + +/** + * A settable Supplier. Not thread safe. + */ +public class SettableSupplier implements Supplier +{ + private T obj; + + public SettableSupplier() + { + } + + public SettableSupplier(T initialValue) + { + obj = initialValue; + } + + public void set(T obj) + { + this.obj = obj; + } + + @Override + public T get() + { + return obj; + } +} diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java index 4ef717dacd4..a8b25460b1c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -89,8 +89,7 @@ public class InputRowSerde IncrementalIndex.makeColumnSelectorFactory( aggFactory, supplier, - true, - null + true ) ); try { diff --git a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java index 2ea3b27c0c1..ec08a0cfb8a 100644 --- a/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java +++ b/processing/src/main/java/io/druid/query/aggregation/FilteredAggregatorFactory.java @@ -21,23 +21,9 @@ package io.druid.query.aggregation; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import io.druid.query.ColumnSelectorPlus; -import io.druid.query.dimension.DefaultDimensionSpec; -import io.druid.query.dimension.DimensionSpec; import io.druid.query.filter.DimFilter; -import io.druid.query.filter.DruidLongPredicate; -import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; -import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; -import io.druid.query.filter.ValueMatcherFactory; import io.druid.segment.ColumnSelectorFactory; -import io.druid.segment.DimensionHandlerUtils; -import io.druid.segment.column.Column; -import io.druid.segment.column.ColumnCapabilities; -import io.druid.segment.column.ValueType; -import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; import java.nio.ByteBuffer; @@ -66,8 +52,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory @Override public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory) { - final ValueMatcherFactory valueMatcherFactory = new FilteredAggregatorValueMatcherFactory(columnSelectorFactory); - final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(valueMatcherFactory); + final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(columnSelectorFactory); return new FilteredAggregator( valueMatcher, delegate.factorize(columnSelectorFactory) @@ -77,8 +62,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory) { - final ValueMatcherFactory valueMatcherFactory = new FilteredAggregatorValueMatcherFactory(columnSelectorFactory); - final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(valueMatcherFactory); + final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(columnSelectorFactory); return new FilteredBufferAggregator( valueMatcher, delegate.factorizeBuffered(columnSelectorFactory) @@ -208,81 +192,4 @@ public class FilteredAggregatorFactory extends AggregatorFactory result = 31 * result + (filter != null ? filter.hashCode() : 0); return result; } - - private static class FilteredAggregatorValueMatcherFactory implements ValueMatcherFactory - { - private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY = - new ValueMatcherColumnSelectorStrategyFactory(); - - private final ColumnSelectorFactory columnSelectorFactory; - - public FilteredAggregatorValueMatcherFactory(ColumnSelectorFactory columnSelectorFactory) - { - this.columnSelectorFactory = columnSelectorFactory; - } - - @Override - public ValueMatcher makeValueMatcher(final String dimension, final String value) - { - if (getTypeForDimension(dimension) == ValueType.LONG) { - return Filters.getLongValueMatcher( - columnSelectorFactory.makeLongColumnSelector(dimension), - value - ); - } - - ColumnSelectorPlus[] selector = - DimensionHandlerUtils.createColumnSelectorPluses( - STRATEGY_FACTORY, - ImmutableList.of(DefaultDimensionSpec.of(dimension)), - columnSelectorFactory - ); - - - final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); - return strategy.getValueMatcher(dimension, columnSelectorFactory, value); - } - - public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory) - { - ValueType type = getTypeForDimension(dimension); - switch (type) { - case LONG: - return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate()); - case STRING: - ColumnSelectorPlus[] selector = - DimensionHandlerUtils.createColumnSelectorPluses( - STRATEGY_FACTORY, - ImmutableList.of(DefaultDimensionSpec.of(dimension)), - columnSelectorFactory - ); - - - final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); - return strategy.getValueMatcher(dimension, columnSelectorFactory, predicateFactory); - default: - return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null)); - } - } - - private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate) - { - return Filters.getLongPredicateMatcher( - columnSelectorFactory.makeLongColumnSelector(dimension), - predicate - ); - } - - private ValueType getTypeForDimension(String dimension) - { - // FilteredAggregatorFactory is sometimes created from a ColumnSelectorFactory that - // has no knowledge of column capabilities/types. - // Default to LONG for __time, STRING for everything else. - if (dimension.equals(Column.TIME_COLUMN_NAME)) { - return ValueType.LONG; - } - ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimension); - return capabilities == null ? ValueType.STRING : capabilities.getType(); - } - } } diff --git a/processing/src/main/java/io/druid/query/filter/BooleanFilter.java b/processing/src/main/java/io/druid/query/filter/BooleanFilter.java index d1143370406..e77219c5f6a 100644 --- a/processing/src/main/java/io/druid/query/filter/BooleanFilter.java +++ b/processing/src/main/java/io/druid/query/filter/BooleanFilter.java @@ -19,6 +19,8 @@ package io.druid.query.filter; +import io.druid.segment.ColumnSelectorFactory; + import java.util.List; public interface BooleanFilter extends Filter @@ -33,16 +35,17 @@ public interface BooleanFilter extends Filter * An implementation should either: * - return a ValueMatcher that checks row values, using the provided ValueMatcherFactory * - or, if possible, get a bitmap index for this filter using the BitmapIndexSelector, and - * return a ValueMatcher that checks the current row offset, created using the bitmap index. + * return a ValueMatcher that checks the current row offset, created using the bitmap index. * - * @param selector Object used to retrieve bitmap indexes - * @param valueMatcherFactory Object used to create ValueMatchers + * @param selector Object used to retrieve bitmap indexes + * @param columnSelectorFactory Object used to select columns for making ValueMatchers * @param rowOffsetMatcherFactory Object used to create RowOffsetMatchers + * * @return ValueMatcher that applies this filter */ public ValueMatcher makeMatcher( BitmapIndexSelector selector, - ValueMatcherFactory valueMatcherFactory, + ColumnSelectorFactory columnSelectorFactory, RowOffsetMatcherFactory rowOffsetMatcherFactory ); } diff --git a/processing/src/main/java/io/druid/query/filter/BoundDimFilter.java b/processing/src/main/java/io/druid/query/filter/BoundDimFilter.java index 8ffa0217fa8..cf9236617b9 100644 --- a/processing/src/main/java/io/druid/query/filter/BoundDimFilter.java +++ b/processing/src/main/java/io/druid/query/filter/BoundDimFilter.java @@ -314,22 +314,27 @@ public class BoundDimFilter implements DimFilter private Supplier makeLongPredicateSupplier() { - return new Supplier() + class BoundLongPredicateSupplier implements Supplier { private final Object initLock = new Object(); + + // longsInitialized is volatile since it establishes the happens-before relationship on + // writes/reads to the rest of the fields (it's written last and read first). private volatile boolean longsInitialized = false; - private volatile boolean matchesAnything = true; - private volatile boolean hasLowerLongBoundVolatile; - private volatile boolean hasUpperLongBoundVolatile; - private volatile long lowerLongBoundVolatile; - private volatile long upperLongBoundVolatile; + + // Other fields are not volatile. + private boolean matchesNothing; + private boolean hasLowerLongBound; + private boolean hasUpperLongBound; + private long lowerLongBound; + private long upperLongBound; @Override public DruidLongPredicate get() { initLongData(); - if (!matchesAnything) { + if (matchesNothing) { return new DruidLongPredicate() { @Override @@ -342,10 +347,10 @@ public class BoundDimFilter implements DimFilter return new DruidLongPredicate() { - private final boolean hasLowerLongBound = hasLowerLongBoundVolatile; - private final boolean hasUpperLongBound = hasUpperLongBoundVolatile; - private final long lowerLongBound = hasLowerLongBound ? lowerLongBoundVolatile : 0L; - private final long upperLongBound = hasUpperLongBound ? upperLongBoundVolatile : 0L; + private final boolean hasLowerLongBound = BoundLongPredicateSupplier.this.hasLowerLongBound; + private final boolean hasUpperLongBound = BoundLongPredicateSupplier.this.hasUpperLongBound; + private final long lowerLongBound = hasLowerLongBound ? BoundLongPredicateSupplier.this.lowerLongBound : 0L; + private final long upperLongBound = hasUpperLongBound ? BoundLongPredicateSupplier.this.upperLongBound : 0L; @Override public boolean applyLong(long input) @@ -382,35 +387,39 @@ public class BoundDimFilter implements DimFilter return; } + matchesNothing = false; + if (hasLowerBound()) { final Long lowerLong = GuavaUtils.tryParseLong(lower); if (lowerLong == null) { - matchesAnything = false; - return; + // Unparseable values fall before all actual numbers, so all numbers will match the lower bound. + hasLowerLongBound = false; + } else { + hasLowerLongBound = true; + lowerLongBound = lowerLong; } - - hasLowerLongBoundVolatile = true; - lowerLongBoundVolatile = lowerLong; } else { - hasLowerLongBoundVolatile = false; + hasLowerLongBound = false; } if (hasUpperBound()) { Long upperLong = GuavaUtils.tryParseLong(upper); if (upperLong == null) { - matchesAnything = false; + // Unparseable values fall before all actual numbers, so no numbers can match the upper bound. + matchesNothing = true; return; } - hasUpperLongBoundVolatile = true; - upperLongBoundVolatile = upperLong; + hasUpperLongBound = true; + upperLongBound = upperLong; } else { - hasUpperLongBoundVolatile = false; + hasUpperLongBound = false; } longsInitialized = true; } } - }; + } + return new BoundLongPredicateSupplier(); } } diff --git a/processing/src/main/java/io/druid/query/filter/Filter.java b/processing/src/main/java/io/druid/query/filter/Filter.java index 26d634ded49..60f38269494 100644 --- a/processing/src/main/java/io/druid/query/filter/Filter.java +++ b/processing/src/main/java/io/druid/query/filter/Filter.java @@ -20,6 +20,7 @@ package io.druid.query.filter; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.segment.ColumnSelectorFactory; /** */ @@ -40,7 +41,7 @@ public interface Filter * @param factory Object used to create ValueMatchers * @return ValueMatcher that applies this filter to row values. */ - public ValueMatcher makeMatcher(ValueMatcherFactory factory); + public ValueMatcher makeMatcher(ColumnSelectorFactory factory); /** diff --git a/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java index 8a1207c3542..88b21b314aa 100644 --- a/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/StringValueMatcherColumnSelectorStrategy.java @@ -21,30 +21,29 @@ package io.druid.query.filter; import com.google.common.base.Predicate; import com.google.common.base.Strings; -import io.druid.query.dimension.DefaultDimensionSpec; -import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; import io.druid.segment.data.IndexedInts; +import io.druid.segment.filter.BooleanValueMatcher; import java.util.BitSet; import java.util.Objects; -public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherColumnSelectorStrategy +public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherColumnSelectorStrategy { @Override - public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final String value) + public ValueMatcher makeValueMatcher(final DimensionSelector selector, final String value) { final String valueStr = Strings.emptyToNull(value); - final DimensionSelector selector = cursor.makeDimensionSelector( - new DefaultDimensionSpec(columnName, columnName) - ); // if matching against null, rows with size 0 should also match final boolean matchNull = Strings.isNullOrEmpty(valueStr); final int cardinality = selector.getValueCardinality(); - if (cardinality >= 0) { + if (cardinality == 0 || (cardinality == 1 && selector.lookupName(0) == null)) { + // All values are null or empty rows (which match nulls anyway). No need to check each row. + return new BooleanValueMatcher(matchNull); + } else if (cardinality >= 0) { // Dictionary-encoded dimension. Compare by id instead of by value to save time. final int valueId = selector.lookupId(valueStr); @@ -94,17 +93,19 @@ public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherCol } @Override - public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory) + public ValueMatcher makeValueMatcher( + final DimensionSelector selector, + final DruidPredicateFactory predicateFactory + ) { - final DimensionSelector selector = cursor.makeDimensionSelector( - new DefaultDimensionSpec(columnName, columnName) - ); - final Predicate predicate = predicateFactory.makeStringPredicate(); final int cardinality = selector.getValueCardinality(); final boolean matchNull = predicate.apply(null); - if (cardinality >= 0) { + if (cardinality == 0 || (cardinality == 1 && selector.lookupName(0) == null)) { + // All values are null or empty rows (which match nulls anyway). No need to check each row. + return new BooleanValueMatcher(matchNull); + } else if (cardinality >= 0) { // Dictionary-encoded dimension. Check every value; build a bitset of matching ids. final BitSet valueIds = new BitSet(cardinality); for (int i = 0; i < cardinality; i++) { diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java index 6e6c747bd4d..5b2373df5d9 100644 --- a/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategy.java @@ -20,26 +20,25 @@ package io.druid.query.filter; import io.druid.query.dimension.ColumnSelectorStrategy; -import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; -public interface ValueMatcherColumnSelectorStrategy extends ColumnSelectorStrategy +public interface ValueMatcherColumnSelectorStrategy extends ColumnSelectorStrategy { /** - * Create a single value ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory. + * Create a single value ValueMatcher. * - * @param cursor ColumnSelectorFactory for creating dimension value selectors + * @param selector Column selector * @param value Value to match against * @return ValueMatcher that matches on 'value' */ - ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, String value); - + ValueMatcher makeValueMatcher(ValueSelectorType selector, String value); /** - * Create a predicate-based ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory. + * Create a predicate-based ValueMatcher. * - * @param cursor ColumnSelectorFactory for creating dimension value selectors + * @param selector Column selector * @param predicateFactory A DruidPredicateFactory that provides the filter predicates to be matched * @return A ValueMatcher that applies the predicate for this DimensionQueryHelper's value type from the predicateFactory */ - ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory); + ValueMatcher makeValueMatcher(ValueSelectorType selector, DruidPredicateFactory predicateFactory); } diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java index 828189471d1..07bc477675b 100644 --- a/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcherColumnSelectorStrategyFactory.java @@ -27,6 +27,18 @@ import io.druid.segment.column.ValueType; public class ValueMatcherColumnSelectorStrategyFactory implements ColumnSelectorStrategyFactory { + private static final ValueMatcherColumnSelectorStrategyFactory INSTANCE = new ValueMatcherColumnSelectorStrategyFactory(); + + private ValueMatcherColumnSelectorStrategyFactory() + { + // Singleton. + } + + public static ValueMatcherColumnSelectorStrategyFactory instance() + { + return INSTANCE; + } + @Override public ValueMatcherColumnSelectorStrategy makeColumnSelectorStrategy( ColumnCapabilities capabilities diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java deleted file mode 100644 index 33e0c57aa8c..00000000000 --- a/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query.filter; - -/** - * A ValueMatcherFactory is an object associated with a collection of rows (e.g., an IncrementalIndexStorageAdapter) - * that generates ValueMatchers for filtering on the associated collection of rows. - * - * A ValueMatcher is an object that decides whether a row value matches a value or predicate - * associated with the ValueMatcher. - * - * The ValueMatcher is expected to track the current row to be matched with a stateful - * object (e.g., a ColumnSelectorFactory). The ValueMatcher has no responsibility for moving the current - * "row pointer", this is handled outside of the ValueMatcher. - * - * The ValueMatcherFactory/ValueMatcher classes are used for filtering rows during column scans. - */ -public interface ValueMatcherFactory -{ - /** - * Create a ValueMatcher that compares row values to the provided string. - * - * An implementation of this method should be able to handle dimensions of various types. - * - * @param dimension The dimension to filter. - * @param value The value to match against, represented as a String. - * - * @return An object that matches row values on the provided value. - */ - public ValueMatcher makeValueMatcher(String dimension, String value); - - /** - * Create a ValueMatcher that applies a predicate to row values. - * - * The caller provides a predicate factory that can create a predicate for each value type supported by Druid. - * See {@link DruidPredicateFactory} for more information. - * - * When creating the ValueMatcher, the ValueMatcherFactory implementation should decide what type of predicate - * to create from the predicate factory based on the ValueType of the specified dimension. - * - * @param dimension The dimension to filter. - * @param predicateFactory Predicate factory - * @return An object that applies a predicate to row values - */ - public ValueMatcher makeValueMatcher(String dimension, DruidPredicateFactory predicateFactory); -} diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java index 7df97a4c56e..c5a67d0a178 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQuery.java @@ -143,6 +143,7 @@ public class GroupByQuery extends BaseQuery @Override public Sequence apply(Sequence input) { + GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this)); return Sequences.filter( input, new Predicate() @@ -369,6 +370,15 @@ public class GroupByQuery extends BaseQuery return 0; } + /** + * Apply the havingSpec and limitSpec. Because havingSpecs are not thread safe, and because they are applied during + * accumulation of the returned sequence, callers must take care to avoid accumulating two different Sequences + * returned by this method in two different threads. + * + * @param results sequence of rows to apply havingSpec and limitSpec to + * + * @return sequence of rows after applying havingSpec and limitSpec + */ public Sequence applyLimit(Sequence results) { return limitFn.apply(results); diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 1fda3890aa5..1d3684e20dd 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -19,7 +19,9 @@ package io.druid.query.groupby; +import com.google.common.base.Enums; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; @@ -37,6 +39,7 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.ResourceLimitExceededException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.dimension.DimensionSpec; +import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; @@ -45,6 +48,7 @@ import io.druid.segment.incremental.OnheapIncrementalIndex; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -219,4 +223,34 @@ public class GroupByQueryHelper } ); } + + /** + * Returns types for fields that will appear in the Rows output from "query". Useful for feeding them into + * {@link RowBasedColumnSelectorFactory}. + * + * @param query groupBy query + * + * @return row types + */ + public static Map rowSignatureFor(final GroupByQuery query) + { + final ImmutableMap.Builder types = ImmutableMap.builder(); + + for (DimensionSpec dimensionSpec : query.getDimensions()) { + types.put(dimensionSpec.getOutputName(), ValueType.STRING); + } + + for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) { + final String typeName = aggregatorFactory.getTypeName(); + final ValueType valueType = typeName != null + ? Enums.getIfPresent(ValueType.class, typeName.toUpperCase()).orNull() + : null; + if (valueType != null) { + types.put(aggregatorFactory.getName(), valueType); + } + } + + // Don't include post-aggregators since we don't know what types they are. + return types.build(); + } } diff --git a/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java b/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java new file mode 100644 index 00000000000..378543e48f8 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/RowBasedColumnSelectorFactory.java @@ -0,0 +1,297 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby; + +import com.google.common.base.Strings; +import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.druid.data.input.Row; +import io.druid.math.expr.Evals; +import io.druid.math.expr.Expr; +import io.druid.math.expr.Parser; +import io.druid.query.dimension.DimensionSpec; +import io.druid.query.extraction.ExtractionFn; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionSelector; +import io.druid.segment.FloatColumnSelector; +import io.druid.segment.LongColumnSelector; +import io.druid.segment.NumericColumnSelector; +import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.Column; +import io.druid.segment.column.ColumnCapabilities; +import io.druid.segment.column.ColumnCapabilitiesImpl; +import io.druid.segment.column.ValueType; +import io.druid.segment.data.IndexedInts; +import io.druid.segment.data.RangeIndexedInts; +import io.druid.segment.data.ZeroIndexedInts; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; + +public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory +{ + private final Supplier row; + private final Map rowSignature; + + private RowBasedColumnSelectorFactory( + final Supplier row, + @Nullable final Map rowSignature + ) + { + this.row = row; + this.rowSignature = rowSignature != null ? rowSignature : ImmutableMap.of(); + } + + public static RowBasedColumnSelectorFactory create( + final Supplier row, + @Nullable final Map rowSignature + ) + { + return new RowBasedColumnSelectorFactory(row, rowSignature); + } + + public static RowBasedColumnSelectorFactory create( + final ThreadLocal row, + @Nullable final Map rowSignature + ) + { + return new RowBasedColumnSelectorFactory( + new Supplier() + { + @Override + public Row get() + { + return row.get(); + } + }, + rowSignature + ); + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + // This dimension selector does not have an associated lookup dictionary, which means lookup can only be done + // on the same row. Hence it returns CARDINALITY_UNKNOWN from getValueCardinality. + return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); + } + + private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensionSpec) + { + final String dimension = dimensionSpec.getDimension(); + final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); + + if (Column.TIME_COLUMN_NAME.equals(dimensionSpec.getDimension())) { + if (extractionFn == null) { + throw new UnsupportedOperationException("time dimension must provide an extraction function"); + } + + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return ZeroIndexedInts.instance(); + } + + @Override + public int getValueCardinality() + { + return DimensionSelector.CARDINALITY_UNKNOWN; + } + + @Override + public String lookupName(int id) + { + return extractionFn.apply(row.get().getTimestampFromEpoch()); + } + + @Override + public int lookupId(String name) + { + throw new UnsupportedOperationException("lookupId"); + } + }; + } else { + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + final List dimensionValues = row.get().getDimension(dimension); + return RangeIndexedInts.create(dimensionValues != null ? dimensionValues.size() : 0); + } + + @Override + public int getValueCardinality() + { + return DimensionSelector.CARDINALITY_UNKNOWN; + } + + @Override + public String lookupName(int id) + { + final String value = Strings.emptyToNull(row.get().getDimension(dimension).get(id)); + return extractionFn == null ? value : extractionFn.apply(value); + } + + @Override + public int lookupId(String name) + { + throw new UnsupportedOperationException("lookupId"); + } + }; + } + } + + @Override + public FloatColumnSelector makeFloatColumnSelector(final String columnName) + { + if (columnName.equals(Column.TIME_COLUMN_NAME)) { + return new FloatColumnSelector() + { + @Override + public float get() + { + return (float) row.get().getTimestampFromEpoch(); + } + }; + } else { + return new FloatColumnSelector() + { + @Override + public float get() + { + return row.get().getFloatMetric(columnName); + } + }; + } + } + + @Override + public LongColumnSelector makeLongColumnSelector(final String columnName) + { + if (columnName.equals(Column.TIME_COLUMN_NAME)) { + return new LongColumnSelector() + { + @Override + public long get() + { + return row.get().getTimestampFromEpoch(); + } + }; + } else { + return new LongColumnSelector() + { + @Override + public long get() + { + return row.get().getLongMetric(columnName); + } + }; + } + } + + @Override + public ObjectColumnSelector makeObjectColumnSelector(final String columnName) + { + if (columnName.equals(Column.TIME_COLUMN_NAME)) { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Long.class; + } + + @Override + public Object get() + { + return row.get().getTimestampFromEpoch(); + } + }; + } else { + return new ObjectColumnSelector() + { + @Override + public Class classOfObject() + { + return Object.class; + } + + @Override + public Object get() + { + return row.get().getRaw(columnName); + } + }; + } + } + + @Override + public NumericColumnSelector makeMathExpressionSelector(String expression) + { + final Expr parsed = Parser.parse(expression); + + final List required = Parser.findRequiredBindings(parsed); + final Map> values = Maps.newHashMapWithExpectedSize(required.size()); + + for (final String columnName : required) { + values.put( + columnName, new Supplier() + { + @Override + public Number get() + { + return Evals.toNumber(row.get().getRaw(columnName)); + } + } + ); + } + final Expr.ObjectBinding binding = Parser.withSuppliers(values); + + return new NumericColumnSelector() + { + @Override + public Number get() + { + return parsed.eval(binding).numericValue(); + } + }; + } + + @Override + public ColumnCapabilities getColumnCapabilities(String columnName) + { + if (Column.TIME_COLUMN_NAME.equals(columnName)) { + // TIME_COLUMN_NAME is handled specially; override the provided rowSignature. + return new ColumnCapabilitiesImpl().setType(ValueType.LONG); + } else { + final ValueType valueType = rowSignature.get(columnName); + + // Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things. + return valueType != null + ? new ColumnCapabilitiesImpl().setType(valueType) + : new ColumnCapabilitiesImpl().setType(ValueType.STRING); + } + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/RowBasedValueMatcherFactory.java b/processing/src/main/java/io/druid/query/groupby/RowBasedValueMatcherFactory.java deleted file mode 100644 index 7511998dc94..00000000000 --- a/processing/src/main/java/io/druid/query/groupby/RowBasedValueMatcherFactory.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query.groupby; - -import com.google.common.base.Predicate; -import com.google.common.base.Strings; -import io.druid.common.guava.GuavaUtils; -import io.druid.data.input.Row; -import io.druid.query.filter.DruidLongPredicate; -import io.druid.query.filter.DruidPredicateFactory; -import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; -import io.druid.segment.column.Column; -import io.druid.segment.filter.BooleanValueMatcher; - -import java.util.List; -import java.util.Objects; - -public class RowBasedValueMatcherFactory implements ValueMatcherFactory -{ - private Row row; - - public void setRow(Row row) - { - this.row = row; - } - - @Override - public ValueMatcher makeValueMatcher(final String dimension, final String value) - { - if (dimension.equals(Column.TIME_COLUMN_NAME)) { - if (value == null) { - return new BooleanValueMatcher(false); - } - - final Long longValue = GuavaUtils.tryParseLong(value); - if (longValue == null) { - return new BooleanValueMatcher(false); - } - - return new ValueMatcher() - { - // store the primitive, so we don't unbox for every comparison - final long unboxedLong = longValue; - - @Override - public boolean matches() - { - return row.getTimestampFromEpoch() == unboxedLong; - } - }; - } else { - final String valueOrNull = Strings.emptyToNull(value); - return new ValueMatcher() - { - @Override - public boolean matches() - { - return doesMatch(row.getRaw(dimension), valueOrNull); - } - }; - } - } - - // There is no easy way to determine column types from a Row, so this generates all possible predicates and then - // uses instanceof checks to determine which one to apply to each row. - @Override - public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory) - { - if (dimension.equals(Column.TIME_COLUMN_NAME)) { - return new ValueMatcher() - { - final DruidLongPredicate predicate = predicateFactory.makeLongPredicate(); - - @Override - public boolean matches() - { - return predicate.applyLong(row.getTimestampFromEpoch()); - } - }; - } else { - return new ValueMatcher() - { - final Predicate stringPredicate = predicateFactory.makeStringPredicate(); - final DruidLongPredicate longPredicate = predicateFactory.makeLongPredicate(); - - @Override - public boolean matches() - { - return doesMatch(row.getRaw(dimension), stringPredicate, longPredicate); - } - }; - } - } - - // Precondition: value must be run through Strings.emptyToNull - private boolean doesMatch(final Object raw, final String value) - { - if (raw == null) { - return value == null; - } else if (raw instanceof List) { - final List theList = (List) raw; - if (theList.isEmpty()) { - // null should match empty rows in multi-value columns - return value == null; - } else { - for (Object o : theList) { - if (doesMatch(o, value)) { - return true; - } - } - - return false; - } - } else { - return Objects.equals(Strings.emptyToNull(raw.toString()), value); - } - } - - private boolean doesMatch( - final Object raw, - final Predicate stringPredicate, - final DruidLongPredicate longPredicate - ) - { - if (raw == null) { - return stringPredicate.apply(null); - } else if (raw instanceof List) { - final List theList = (List) raw; - if (theList.isEmpty()) { - return stringPredicate.apply(null); - } else { - for (Object o : theList) { - if (doesMatch(o, stringPredicate, longPredicate)) { - return true; - } - } - - return false; - } - } else if (raw instanceof Long || raw instanceof Integer) { - return longPredicate.applyLong(((Number) raw).longValue()); - } else { - return stringPredicate.apply(Strings.emptyToNull(raw.toString())); - } - } -} diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java index 51164183688..c1c46792341 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByRowProcessor.java @@ -24,6 +24,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Lists; import io.druid.collections.BlockingPool; import io.druid.collections.ReferenceCountingResourceHolder; +import io.druid.common.guava.SettableSupplier; import io.druid.common.utils.JodaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.Pair; @@ -41,8 +42,9 @@ import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; -import io.druid.query.groupby.RowBasedValueMatcherFactory; +import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey; +import io.druid.segment.column.ValueType; import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.filter.Filters; import org.joda.time.DateTime; @@ -53,6 +55,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeoutException; @@ -61,6 +64,7 @@ public class GroupByRowProcessor public static Sequence process( final Query queryParam, final Sequence rows, + final Map rowSignature, final GroupByQueryConfig config, final BlockingPool mergeBufferPool, final ObjectMapper spillMapper @@ -86,10 +90,15 @@ public class GroupByRowProcessor query, Filters.toFilter(query.getDimFilter()) ); - final RowBasedValueMatcherFactory filterMatcherFactory = new RowBasedValueMatcherFactory(); + + final SettableSupplier rowSupplier = new SettableSupplier<>(); + final RowBasedColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create( + rowSupplier, + rowSignature + ); final ValueMatcher filterMatcher = filter == null ? new BooleanValueMatcher(true) - : filter.makeMatcher(filterMatcherFactory); + : filter.makeMatcher(columnSelectorFactory); final FilteredSequence filteredSequence = new FilteredSequence<>( rows, @@ -108,7 +117,7 @@ public class GroupByRowProcessor if (!inInterval) { return false; } - filterMatcherFactory.setRow(input); + rowSupplier.set(input); return filterMatcher.matches(); } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 33f014662fd..dbb3228b2ca 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Chars; @@ -36,31 +35,19 @@ import io.druid.data.input.Row; import io.druid.granularity.AllGranularity; import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Accumulator; -import io.druid.math.expr.Evals; -import io.druid.math.expr.Expr; -import io.druid.math.expr.Parser; import io.druid.query.QueryInterruptedException; import io.druid.query.aggregation.AggregatorFactory; -import io.druid.query.dimension.DimensionSpec; -import io.druid.query.extraction.ExtractionFn; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryHelper; +import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.query.groupby.strategy.GroupByStrategyV2; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionSelector; -import io.druid.segment.FloatColumnSelector; -import io.druid.segment.LongColumnSelector; -import io.druid.segment.NumericColumnSelector; -import io.druid.segment.ObjectColumnSelector; -import io.druid.segment.column.Column; -import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.data.IndexedInts; -import it.unimi.dsi.fastutil.ints.IntIterator; -import it.unimi.dsi.fastutil.ints.IntIterators; import org.joda.time.DateTime; import java.io.Closeable; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Comparator; @@ -93,7 +80,11 @@ public class RowBasedGrouperHelper query.getDimensions().size(), querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint) ); - final RowBasedColumnSelectorFactory columnSelectorFactory = new RowBasedColumnSelectorFactory(); + final ThreadLocal columnSelectorRow = new ThreadLocal<>(); + final ColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create( + columnSelectorRow, + GroupByQueryHelper.rowSignatureFor(query) + ); final Grouper grouper; if (concurrencyHint == -1) { grouper = new SpillingGrouper<>( @@ -150,8 +141,7 @@ public class RowBasedGrouperHelper return null; } - - columnSelectorFactory.setRow(row); + columnSelectorRow.set(row); final int dimStart; final Comparable[] key; @@ -193,7 +183,7 @@ public class RowBasedGrouperHelper // null return means grouping resources were exhausted. return null; } - columnSelectorFactory.setRow(null); + columnSelectorRow.set(null); return theGrouper; } @@ -623,197 +613,4 @@ public class RowBasedGrouperHelper return idx; } } - - private static class RowBasedColumnSelectorFactory implements ColumnSelectorFactory - { - private ThreadLocal row = new ThreadLocal<>(); - - public void setRow(Row row) - { - this.row.set(row); - } - - // This dimension selector does not have an associated lookup dictionary, which means lookup can only be done - // on the same row. This dimension selector is used for applying the extraction function on dimension, which - // requires a DimensionSelector implementation - @Override - public DimensionSelector makeDimensionSelector( - DimensionSpec dimensionSpec - ) - { - return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); - } - - private DimensionSelector makeDimensionSelectorUndecorated( - DimensionSpec dimensionSpec - ) - { - final String dimension = dimensionSpec.getDimension(); - final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); - - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - final List dimensionValues = row.get().getDimension(dimension); - - final int dimensionValuesSize = dimensionValues != null ? dimensionValues.size() : 0; - - return new IndexedInts() - { - @Override - public int size() - { - return dimensionValuesSize; - } - - @Override - public int get(int index) - { - if (index < 0 || index >= dimensionValuesSize) { - throw new IndexOutOfBoundsException("index: " + index); - } - return index; - } - - @Override - public IntIterator iterator() - { - return IntIterators.fromTo(0, dimensionValuesSize); - } - - @Override - public void close() throws IOException - { - - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - }; - } - - @Override - public int getValueCardinality() - { - return DimensionSelector.CARDINALITY_UNKNOWN; - } - - @Override - public String lookupName(int id) - { - final String value = row.get().getDimension(dimension).get(id); - return extractionFn == null ? value : extractionFn.apply(value); - } - - @Override - public int lookupId(String name) - { - if (extractionFn != null) { - throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function"); - } - return row.get().getDimension(dimension).indexOf(name); - } - }; - } - - @Override - public FloatColumnSelector makeFloatColumnSelector(final String columnName) - { - return new FloatColumnSelector() - { - @Override - public float get() - { - return row.get().getFloatMetric(columnName); - } - }; - } - - @Override - public LongColumnSelector makeLongColumnSelector(final String columnName) - { - if (columnName.equals(Column.TIME_COLUMN_NAME)) { - return new LongColumnSelector() - { - @Override - public long get() - { - return row.get().getTimestampFromEpoch(); - } - }; - } - return new LongColumnSelector() - { - @Override - public long get() - { - return row.get().getLongMetric(columnName); - } - }; - } - - @Override - public ObjectColumnSelector makeObjectColumnSelector(final String columnName) - { - return new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return row.get().getRaw(columnName); - } - }; - } - - @Override - public NumericColumnSelector makeMathExpressionSelector(String expression) - { - final Expr parsed = Parser.parse(expression); - - final List required = Parser.findRequiredBindings(parsed); - final Map> values = Maps.newHashMapWithExpectedSize(required.size()); - - for (final String columnName : required) { - values.put( - columnName, new Supplier() - { - @Override - public Number get() - { - return Evals.toNumber(row.get().getRaw(columnName)); - } - } - ); - } - final Expr.ObjectBinding binding = Parser.withSuppliers(values); - - return new NumericColumnSelector() - { - @Override - public Number get() - { - return parsed.eval(binding).numericValue(); - } - }; - } - - @Override - public ColumnCapabilities getColumnCapabilities(String columnName) - { - // We don't have any information on the column value type, returning null defaults type to string - return null; - } - } - } diff --git a/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java index 75aa8973d54..2279cac984f 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AlwaysHavingSpec.java @@ -24,7 +24,7 @@ import io.druid.data.input.Row; /** * A "having" spec that always evaluates to true */ -public class AlwaysHavingSpec implements HavingSpec +public class AlwaysHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x0; diff --git a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java index c8cb43c7c4a..a31d3537ade 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/AndHavingSpec.java @@ -23,14 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.segment.column.ValueType; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; /** * The logical "and" operator for the "having" clause. */ -public class AndHavingSpec implements HavingSpec +public class AndHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x2; @@ -48,6 +50,14 @@ public class AndHavingSpec implements HavingSpec return havingSpecs; } + @Override + public void setRowSignature(Map rowSignature) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setRowSignature(rowSignature); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java new file mode 100644 index 00000000000..5700e89dea2 --- /dev/null +++ b/processing/src/main/java/io/druid/query/groupby/having/BaseHavingSpec.java @@ -0,0 +1,33 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.having; + +import io.druid.segment.column.ValueType; + +import java.util.Map; + +public abstract class BaseHavingSpec implements HavingSpec +{ + @Override + public void setRowSignature(Map rowSignature) + { + // Do nothing. + } +} diff --git a/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java index 06ac202bcc9..12abeb35099 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/DimFilterHavingSpec.java @@ -22,20 +22,24 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import io.druid.common.guava.SettableSupplier; import io.druid.data.input.Row; import io.druid.query.filter.DimFilter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.groupby.RowBasedValueMatcherFactory; +import io.druid.query.groupby.RowBasedColumnSelectorFactory; +import io.druid.segment.column.ValueType; import java.nio.ByteBuffer; +import java.util.Map; -public class DimFilterHavingSpec implements HavingSpec +public class DimFilterHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = (byte) 0x9; private final DimFilter dimFilter; - private final RowBasedValueMatcherFactory valueMatcherFactory; - private final ValueMatcher valueMatcher; + private final SettableSupplier rowSupplier; + private ValueMatcher valueMatcher; + private int evalCount; @JsonCreator public DimFilterHavingSpec( @@ -43,8 +47,7 @@ public class DimFilterHavingSpec implements HavingSpec ) { this.dimFilter = Preconditions.checkNotNull(dimFilter, "filter"); - this.valueMatcherFactory = new RowBasedValueMatcherFactory(); - this.valueMatcher = dimFilter.toFilter().makeMatcher(valueMatcherFactory); + this.rowSupplier = new SettableSupplier<>(); } @JsonProperty("filter") @@ -53,17 +56,25 @@ public class DimFilterHavingSpec implements HavingSpec return dimFilter; } + @Override + public void setRowSignature(Map rowSignature) + { + this.valueMatcher = dimFilter.toFilter() + .makeMatcher(RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature)); + } + @Override public boolean eval(final Row row) { - // Not thread safe, but it doesn't have to be. - valueMatcherFactory.setRow(row); - try { - return valueMatcher.matches(); - } - finally { - valueMatcherFactory.setRow(null); + int oldEvalCount = evalCount; + evalCount++; + rowSupplier.set(row); + final boolean retVal = valueMatcher.matches(); + if (evalCount != oldEvalCount + 1) { + // Oops, someone was using this from two different threads, bad caller. + throw new IllegalStateException("concurrent 'eval' calls not permitted!"); } + return retVal; } @Override diff --git a/processing/src/main/java/io/druid/query/groupby/having/DimensionSelectorHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/DimensionSelectorHavingSpec.java index f8780b10f8d..5aeade63cd9 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/DimensionSelectorHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/DimensionSelectorHavingSpec.java @@ -31,7 +31,7 @@ import io.druid.query.extraction.IdentityExtractionFn; import java.nio.ByteBuffer; import java.util.List; -public class DimensionSelectorHavingSpec implements HavingSpec +public class DimensionSelectorHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = (byte) 0x8; private static final byte STRING_SEPARATOR = (byte) 0xFF; diff --git a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java index efb08deea39..6c259772616 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/EqualToHavingSpec.java @@ -32,7 +32,7 @@ import java.util.Arrays; * The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value", * except that in SQL an aggregation is an expression instead of an aggregation name as in Druid. */ -public class EqualToHavingSpec implements HavingSpec +public class EqualToHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x3; diff --git a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java index 694c0a49e0a..27fec036c74 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/GreaterThanHavingSpec.java @@ -32,7 +32,7 @@ import java.util.Arrays; * The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value", * except that an aggregation in SQL is an expression instead of an aggregation name as in Druid. */ -public class GreaterThanHavingSpec implements HavingSpec +public class GreaterThanHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x4; diff --git a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java index b09a12b0f2d..8e20cbb08ef 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/HavingSpec.java @@ -22,10 +22,14 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import io.druid.data.input.Row; +import io.druid.segment.column.ValueType; + +import java.util.Map; /** * A "having" clause that filters aggregated/dimension value. This is similar to SQL's "having" - * clause. + * clause. HavingSpec objects are *not* thread-safe and must not be used simultaneously by multiple + * threads. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonSubTypes(value = { @@ -46,6 +50,14 @@ public interface HavingSpec public static final HavingSpec NEVER = new NeverHavingSpec(); public static final HavingSpec ALWAYS = new AlwaysHavingSpec(); + /** + * Informs this HavingSpec that rows passed to "eval" will have a certain signature. Will be called + * before "eval". + * + * @param rowSignature signature of the rows + */ + public void setRowSignature(Map rowSignature); + /** * Evaluates if a given row satisfies the having spec. * diff --git a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java index 964a1f05d16..580bba13a0f 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/LessThanHavingSpec.java @@ -31,7 +31,7 @@ import java.util.Arrays; * The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value", * except that an aggregation in SQL is an expression instead of an aggregation name as in Druid. */ -public class LessThanHavingSpec implements HavingSpec +public class LessThanHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x5; diff --git a/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java index 77951d6386c..6098b73359d 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/NeverHavingSpec.java @@ -24,7 +24,7 @@ import io.druid.data.input.Row; /** * A "having" spec that always evaluates to false */ -public class NeverHavingSpec implements HavingSpec +public class NeverHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x1; diff --git a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java index 160e36eadd2..72568e02d29 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/NotHavingSpec.java @@ -22,13 +22,15 @@ package io.druid.query.groupby.having; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.data.input.Row; +import io.druid.segment.column.ValueType; import java.nio.ByteBuffer; +import java.util.Map; /** * The logical "not" operator for the "having" clause. */ -public class NotHavingSpec implements HavingSpec +public class NotHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x6; @@ -46,6 +48,12 @@ public class NotHavingSpec implements HavingSpec return havingSpec; } + @Override + public void setRowSignature(Map rowSignature) + { + havingSpec.setRowSignature(rowSignature); + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java index 28a75fa6bbd..eed28e3023d 100644 --- a/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java +++ b/processing/src/main/java/io/druid/query/groupby/having/OrHavingSpec.java @@ -23,14 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; import io.druid.data.input.Row; +import io.druid.segment.column.ValueType; import java.nio.ByteBuffer; import java.util.List; +import java.util.Map; /** * The logical "or" operator for the "having" clause. */ -public class OrHavingSpec implements HavingSpec +public class OrHavingSpec extends BaseHavingSpec { private static final byte CACHE_KEY = 0x7; @@ -48,6 +50,14 @@ public class OrHavingSpec implements HavingSpec return havingSpecs; } + @Override + public void setRowSignature(Map rowSignature) + { + for (HavingSpec havingSpec : havingSpecs) { + havingSpec.setRowSignature(rowSignature); + } + } + @Override public boolean eval(Row row) { diff --git a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java index 7e1ab35f6b9..b17323fadc9 100644 --- a/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/io/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -48,6 +48,7 @@ import io.druid.query.ResultMergeQueryRunner; import io.druid.query.aggregation.PostAggregator; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryHelper; import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2; import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2; import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2; @@ -207,6 +208,7 @@ public class GroupByStrategyV2 implements GroupByStrategy final Sequence results = GroupByRowProcessor.process( query, subqueryResult, + GroupByQueryHelper.rowSignatureFor(subquery), configSupplier.get(), mergeBufferPool, spillMapper diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java index 27e224a6137..0da65fe47cf 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java @@ -20,9 +20,8 @@ package io.druid.query.topn; import com.google.common.base.Function; -import com.google.common.collect.Lists; -import io.druid.query.Result; import io.druid.query.ColumnSelectorPlus; +import io.druid.query.Result; import io.druid.query.topn.types.TopNStrategyFactory; import io.druid.segment.Cursor; import io.druid.segment.DimensionHandlerUtils; @@ -47,19 +46,19 @@ public class TopNMapFn implements Function> @SuppressWarnings("unchecked") public Result apply(Cursor cursor) { - final ColumnSelectorPlus[] selectorPlusArray = DimensionHandlerUtils.createColumnSelectorPluses( + final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus( STRATEGY_FACTORY, - Lists.newArrayList(query.getDimensionSpec()), + query.getDimensionSpec(), cursor ); - if (selectorPlusArray[0].getSelector() == null) { + if (selectorPlus.getSelector() == null) { return null; } TopNParams params = null; try { - params = topNAlgorithm.makeInitParams(selectorPlusArray[0], cursor); + params = topNAlgorithm.makeInitParams(selectorPlus, cursor); TopNResultBuilder resultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, query); diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java index 2eb773126c4..d9a3b44478f 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -19,12 +19,13 @@ package io.druid.segment; -import io.druid.java.util.common.IAE; +import com.google.common.collect.ImmutableList; import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; +import io.druid.java.util.common.IAE; import io.druid.query.ColumnSelectorPlus; -import io.druid.query.dimension.DimensionSpec; import io.druid.query.dimension.ColumnSelectorStrategy; import io.druid.query.dimension.ColumnSelectorStrategyFactory; +import io.druid.query.dimension.DimensionSpec; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ValueType; @@ -63,6 +64,27 @@ public final class DimensionHandlerUtils return new StringDimensionHandler(dimensionName, multiValueHandling); } + /** + * Convenience function equivalent to calling + * {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton + * list of dimensionSpecs and then retrieving the only element in the returned array. + * + * @param The strategy type created by the provided strategy factory. + * @param strategyFactory A factory provided by query engines that generates type-handling strategies + * @param dimensionSpec column to generate a ColumnSelectorPlus object for + * @param cursor Used to create value selectors for columns. + * + * @return A ColumnSelectorPlus object + */ + public static ColumnSelectorPlus createColumnSelectorPlus( + ColumnSelectorStrategyFactory strategyFactory, + DimensionSpec dimensionSpec, + ColumnSelectorFactory cursor + ) + { + return createColumnSelectorPluses(strategyFactory, ImmutableList.of(dimensionSpec), cursor)[0]; + } + /** * Creates an array of ColumnSelectorPlus objects, selectors that handle type-specific operations within * query processing engines, using a strategy factory provided by the query engine. One ColumnSelectorPlus diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index 50ad5e65340..32d8153312c 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -22,8 +22,6 @@ package io.druid.segment; import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.MutableBitmap; import io.druid.query.dimension.DimensionSpec; -import io.druid.query.filter.DruidPredicateFactory; -import io.druid.query.filter.ValueMatcher; import io.druid.segment.data.Indexed; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; @@ -210,7 +208,7 @@ public interface DimensionIndexer, E * @param desc Descriptor object for this dimension within an IncrementalIndex * @return A new object that reads rows from currEntry */ - Object makeColumnValueSelector( + ColumnValueSelector makeColumnValueSelector( DimensionSpec spec, IncrementalIndexStorageAdapter.EntryHolder currEntry, IncrementalIndex.DimensionDesc desc @@ -308,46 +306,4 @@ public interface DimensionIndexer, E * @param factory bitmap factory */ void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory); - - - /** - * Return a ValueMatcher that accepts an EntryHolder containing the current TimeAndDims key and the array index of this - * indexer's dimension within the TimeAndDims key. - * - * The implementer should read the dimension array Object from the TimeAndDims key and cast it to the appropriate - * type, as described in the documentation for compareUnsortedEncodedArrays(). - * - * The returned ValueMatcher should match the dimension values against matchValue. - * - * See StringDimensionIndexer for a reference implementation. - * - * @param matchValue value to match on - * @param holder holds the current TimeAndDims key during row iteration - * @param dimIndex the array index of this indexer's dimension within the TimeAndDims key - * @return A ValueMatcher that matches a dimension value array from a TimeAndDims key against "matchValue" - */ - ValueMatcher makeIndexingValueMatcher(String matchValue, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex); - - /** - * Return a ValueMatcher that accepts an EntryHolder containing the current TimeAndDims key and the array index of this - * indexer's dimension within the TimeAndDims key. - * - * The implementer should read the dimension array Object from the TimeAndDims key and cast it to the appropriate - * type, as described in the documentation for compareUnsortedEncodedArrays(). - * - * Based on the type of the indexer, this method should get a predicate of the same type from the supplied - * predicateFactory. - * - * For example, a StringDimensionIndexer would call predicateFactory.makeStringPredicate(). - * - * The returned ValueMatcher should apply the generated predicate to the dimension values. - * - * See StringDimensionIndexer for a reference implementation. - * - * @param predicateFactory Factory object that can generate predicates for each supported dimension type - * @param holder holds the current TimeAndDims key during row iteration - * @param dimIndex the array index of this indexer's dimension within the TimeAndDims key - * @return A ValueMatcher that applies a predicate from the predicateFactory to the dimension values in the TimeAndDim keys - */ - ValueMatcher makeIndexingValueMatcher(DruidPredicateFactory predicateFactory, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex); } diff --git a/processing/src/main/java/io/druid/segment/NullDimensionSelector.java b/processing/src/main/java/io/druid/segment/NullDimensionSelector.java index f771afe408e..f5cbe064686 100644 --- a/processing/src/main/java/io/druid/segment/NullDimensionSelector.java +++ b/processing/src/main/java/io/druid/segment/NullDimensionSelector.java @@ -21,47 +21,14 @@ package io.druid.segment; import com.google.common.base.Strings; import io.druid.segment.data.IndexedInts; -import it.unimi.dsi.fastutil.ints.IntIterator; -import it.unimi.dsi.fastutil.ints.IntIterators; - -import java.io.IOException; +import io.druid.segment.data.ZeroIndexedInts; public class NullDimensionSelector implements DimensionSelector { - - private static final IndexedInts SINGLETON = new IndexedInts() { - @Override - public int size() { - return 1; - } - - @Override - public int get(int index) { - return 0; - } - - @Override - public IntIterator iterator() { - return IntIterators.singleton(0); - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("NullDimensionSelector does not support fill"); - } - - @Override - public void close() throws IOException - { - - } - }; - @Override public IndexedInts getRow() { - return SINGLETON; + return ZeroIndexedInts.instance(); } @Override diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index 005bf92c90a..835aa357b83 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -27,27 +27,19 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.io.Closer; - import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.granularity.QueryGranularity; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.math.expr.Expr; import io.druid.math.expr.Parser; -import io.druid.query.ColumnSelectorPlus; import io.druid.query.QueryInterruptedException; -import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BooleanFilter; -import io.druid.query.filter.DruidLongPredicate; -import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.RowOffsetMatcherFactory; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; -import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; -import io.druid.query.filter.ValueMatcherFactory; import io.druid.segment.column.BitmapIndex; import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; @@ -61,7 +53,6 @@ import io.druid.segment.data.IndexedInts; import io.druid.segment.data.Offset; import io.druid.segment.filter.AndFilter; import io.druid.segment.filter.BooleanValueMatcher; -import io.druid.segment.filter.Filters; import it.unimi.dsi.fastutil.ints.IntIterators; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -927,10 +918,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } else { return new QueryableIndexBaseCursor() { - CursorOffsetHolderValueMatcherFactory valueMatcherFactory = new CursorOffsetHolderValueMatcherFactory( - storageAdapter, - this - ); RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory( cursorOffsetHolder, descending @@ -942,7 +929,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter if (postFilter instanceof BooleanFilter) { filterMatcher = ((BooleanFilter) postFilter).makeMatcher( bitmapIndexSelector, - valueMatcherFactory, + this, rowOffsetMatcherFactory ); } else { @@ -950,7 +937,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter filterMatcher = rowOffsetMatcherFactory.makeRowOffsetMatcher(postFilter.getBitmapIndex( bitmapIndexSelector)); } else { - filterMatcher = postFilter.makeMatcher(valueMatcherFactory); + filterMatcher = postFilter.makeMatcher(this); } } } @@ -1040,83 +1027,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter } } - private static class CursorOffsetHolderValueMatcherFactory implements ValueMatcherFactory - { - private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY = - new ValueMatcherColumnSelectorStrategyFactory(); - - private final StorageAdapter storageAdapter; - private final ColumnSelectorFactory cursor; - private final List availableMetrics; - - public CursorOffsetHolderValueMatcherFactory( - StorageAdapter storageAdapter, - ColumnSelectorFactory cursor - ) - { - this.storageAdapter = storageAdapter; - this.cursor = cursor; - this.availableMetrics = Lists.newArrayList(storageAdapter.getAvailableMetrics()); - } - - @Override - public ValueMatcher makeValueMatcher(String dimension, final String value) - { - if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) { - if (getTypeForDimension(dimension) == ValueType.LONG) { - return Filters.getLongValueMatcher( - cursor.makeLongColumnSelector(dimension), - value - ); - } - } - - ColumnSelectorPlus[] selector = - DimensionHandlerUtils.createColumnSelectorPluses( - STRATEGY_FACTORY, - ImmutableList.of(DefaultDimensionSpec.of(dimension)), - cursor - ); - - final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); - return strategy.getValueMatcher(dimension, cursor, value); - } - - @Override - public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory) - { - if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) { - if (getTypeForDimension(dimension) == ValueType.LONG) { - return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate()); - } - } - - ColumnSelectorPlus[] selector = - DimensionHandlerUtils.createColumnSelectorPluses( - STRATEGY_FACTORY, - ImmutableList.of(DefaultDimensionSpec.of(dimension)), - cursor - ); - - final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy(); - return strategy.getValueMatcher(dimension, cursor, predicateFactory); - } - - private ValueMatcher makeLongValueMatcher(String dimension, final DruidLongPredicate predicate) - { - return Filters.getLongPredicateMatcher( - cursor.makeLongColumnSelector(dimension), - predicate - ); - } - - private ValueType getTypeForDimension(String dimension) - { - ColumnCapabilities capabilities = cursor.getColumnCapabilities(dimension); - return capabilities == null ? ValueType.STRING : capabilities.getType(); - } - } - private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory { private final CursorOffsetHolder holder; diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index c869c98dadc..ae450b066f4 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -20,23 +20,18 @@ package io.druid.segment; import com.google.common.base.Function; -import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; - -import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; import io.druid.collections.bitmap.BitmapFactory; import io.druid.collections.bitmap.MutableBitmap; +import io.druid.data.input.impl.DimensionSchema.MultiValueHandling; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; -import io.druid.query.filter.DruidPredicateFactory; -import io.druid.query.filter.ValueMatcher; import io.druid.segment.data.Indexed; import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedIterable; -import io.druid.segment.filter.BooleanValueMatcher; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -379,7 +374,7 @@ public class StringDimensionIndexer implements DimensionIndexer -1) { - if (nullId < maxId) { - valsTmp = IntLists.singleton(nullId); + if (indices == null || indices.length == 0) { + final int nullId = getEncodedValue(null, false); + if (nullId > -1) { + if (nullId < maxId) { + valsTmp = IntLists.singleton(nullId); + } else { + valsTmp = IntLists.EMPTY_LIST; + } } - } else if (indices != null && indices.length > 0) { + } + + if (valsTmp == null && indices != null && indices.length > 0) { valsTmp = new IntArrayList(indices.length); for (int i = 0; i < indices.length; i++) { int id = indices[i]; @@ -534,79 +535,6 @@ public class StringDimensionIndexer implements DimensionIndexer= dims.length) { - return matchOnNull; - } - - int[] dimsInt = (int[]) dims[dimIndex]; - if (dimsInt == null || dimsInt.length == 0) { - return matchOnNull; - } - - for (int i = 0; i < dimsInt.length; i++) { - if (dimsInt[i] == encodedVal) { - return true; - } - } - return false; - } - }; - } - - @Override - public ValueMatcher makeIndexingValueMatcher( - final DruidPredicateFactory predicateFactory, - final IncrementalIndexStorageAdapter.EntryHolder holder, - final int dimIndex - ) - { - final Predicate predicate = predicateFactory.makeStringPredicate(); - final boolean matchOnNull = predicate.apply(null); - return new ValueMatcher() - { - @Override - public boolean matches() - { - Object[] dims = holder.getKey().getDims(); - if (dimIndex >= dims.length) { - return matchOnNull; - } - - int[] dimsInt = (int[]) dims[dimIndex]; - if (dimsInt == null || dimsInt.length == 0) { - return matchOnNull; - } - - for (int i = 0; i < dimsInt.length; i++) { - String finalDimVal = getActualValue(dimsInt[i], false); - if (predicate.apply(finalDimVal)) { - return true; - } - } - return false; - } - }; - } - private SortedDimensionDictionary sortedLookup() { return sortedLookup == null ? sortedLookup = dimLookup.sort() : sortedLookup; diff --git a/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java b/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java new file mode 100644 index 00000000000..172c74e307f --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/RangeIndexedInts.java @@ -0,0 +1,91 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.ints.IntIterators; + +import java.io.IOException; + +/** + * An IndexedInts that always returns [0, 1, ..., N]. + */ +public class RangeIndexedInts implements IndexedInts +{ + private static final int CACHE_LIMIT = 8; + private static final RangeIndexedInts[] CACHE = new RangeIndexedInts[CACHE_LIMIT]; + + static { + for (int i = 0; i < CACHE_LIMIT; i++) { + CACHE[i] = new RangeIndexedInts(i); + } + } + + private final int size; + + private RangeIndexedInts(int size) + { + this.size = size; + } + + public static RangeIndexedInts create(final int size) + { + Preconditions.checkArgument(size >= 0, "size >= 0"); + if (size < CACHE_LIMIT) { + return CACHE[size]; + } else { + return new RangeIndexedInts(size); + } + } + + @Override + public int size() + { + return size; + } + + @Override + public int get(int index) + { + if (index < 0 || index >= size) { + throw new IndexOutOfBoundsException("index: " + index); + } + return index; + } + + @Override + public void fill(int index, int[] toFill) + { + throw new UnsupportedOperationException("fill"); + } + + @Override + public IntIterator iterator() + { + return IntIterators.fromTo(0, size); + } + + @Override + public void close() throws IOException + { + + } +} diff --git a/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java b/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java new file mode 100644 index 00000000000..1ee254da723 --- /dev/null +++ b/processing/src/main/java/io/druid/segment/data/ZeroIndexedInts.java @@ -0,0 +1,75 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.data; + +import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.ints.IntIterators; + +import java.io.IOException; + +/** + * An IndexedInts that always returns a row containing a single zero. + */ +public class ZeroIndexedInts implements IndexedInts +{ + private final static ZeroIndexedInts INSTANCE = new ZeroIndexedInts(); + + private ZeroIndexedInts() + { + // Singleton. + } + + public static ZeroIndexedInts instance() + { + return INSTANCE; + } + + @Override + public int size() + { + return 1; + } + + @Override + public int get(int index) + { + // Skip range check in production, assume "index" was 0 like it really should have been. + assert index == 0; + return 0; + } + + @Override + public void fill(int index, int[] toFill) + { + throw new UnsupportedOperationException("fill"); + } + + @Override + public IntIterator iterator() + { + return IntIterators.singleton(0); + } + + @Override + public void close() throws IOException + { + + } +} diff --git a/processing/src/main/java/io/druid/segment/filter/AndFilter.java b/processing/src/main/java/io/druid/segment/filter/AndFilter.java index 5365c151a80..a1b525a67f7 100644 --- a/processing/src/main/java/io/druid/segment/filter/AndFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/AndFilter.java @@ -27,7 +27,7 @@ import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; import io.druid.query.filter.RowOffsetMatcherFactory; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; import java.util.ArrayList; import java.util.List; @@ -73,7 +73,7 @@ public class AndFilter implements BooleanFilter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { if (filters.size() == 0) { return new BooleanValueMatcher(false); @@ -90,7 +90,7 @@ public class AndFilter implements BooleanFilter @Override public ValueMatcher makeMatcher( BitmapIndexSelector selector, - ValueMatcherFactory valueMatcherFactory, + ColumnSelectorFactory columnSelectorFactory, RowOffsetMatcherFactory rowOffsetMatcherFactory ) { @@ -101,7 +101,7 @@ public class AndFilter implements BooleanFilter if (filter.supportsBitmapIndex(selector)) { bitmaps.add(filter.getBitmapIndex(selector)); } else { - ValueMatcher matcher = filter.makeMatcher(valueMatcherFactory); + ValueMatcher matcher = filter.makeMatcher(columnSelectorFactory); matchers.add(matcher); } } diff --git a/processing/src/main/java/io/druid/segment/filter/BoundFilter.java b/processing/src/main/java/io/druid/segment/filter/BoundFilter.java index 5318d21700d..0f8bb42edf3 100644 --- a/processing/src/main/java/io/druid/segment/filter/BoundFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/BoundFilter.java @@ -29,8 +29,8 @@ import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.ordering.StringComparators; +import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.column.BitmapIndex; import java.util.Comparator; @@ -132,9 +132,9 @@ public class BoundFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - return factory.makeValueMatcher(boundDimFilter.getDimension(), getPredicateFactory()); + return Filters.makeValueMatcher(factory, boundDimFilter.getDimension(), getPredicateFactory()); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java b/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java index c3146038ea4..6af81139283 100644 --- a/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/DimensionPredicateFilter.java @@ -28,7 +28,7 @@ import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; /** */ @@ -93,9 +93,9 @@ public class DimensionPredicateFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - return factory.makeValueMatcher(dimension, predicateFactory); + return Filters.makeValueMatcher(factory, dimension, predicateFactory); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/Filters.java b/processing/src/main/java/io/druid/segment/filter/Filters.java index eea3d1cf089..aa94464b847 100644 --- a/processing/src/main/java/io/druid/segment/filter/Filters.java +++ b/processing/src/main/java/io/druid/segment/filter/Filters.java @@ -28,15 +28,23 @@ import com.google.common.collect.Lists; import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.common.guava.GuavaUtils; import io.druid.java.util.common.guava.FunctionalIterable; +import io.druid.query.ColumnSelectorPlus; import io.druid.query.Query; +import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.DimFilter; import io.druid.query.filter.DruidLongPredicate; +import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; +import io.druid.query.filter.ValueMatcherColumnSelectorStrategy; +import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory; +import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.LongColumnSelector; import io.druid.segment.column.BitmapIndex; +import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; @@ -89,6 +97,84 @@ public class Filters return dimFilter == null ? null : dimFilter.toFilter(); } + /** + * Create a ValueMatcher that compares row values to the provided string. + * + * An implementation of this method should be able to handle dimensions of various types. + * + * @param columnSelectorFactory Selector for columns. + * @param columnName The column to filter. + * @param value The value to match against, represented as a String. + * + * @return An object that matches row values on the provided value. + */ + public static ValueMatcher makeValueMatcher( + final ColumnSelectorFactory columnSelectorFactory, + final String columnName, + final String value + ) + { + final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(columnName); + + // This should be folded into the ValueMatcherColumnSelectorStrategy once that can handle LONG typed columns. + if (capabilities != null && capabilities.getType() == ValueType.LONG) { + return getLongValueMatcher( + columnSelectorFactory.makeLongColumnSelector(columnName), + value + ); + } + + final ColumnSelectorPlus selector = + DimensionHandlerUtils.createColumnSelectorPlus( + ValueMatcherColumnSelectorStrategyFactory.instance(), + DefaultDimensionSpec.of(columnName), + columnSelectorFactory + ); + + return selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(), value); + } + + /** + * Create a ValueMatcher that applies a predicate to row values. + * + * The caller provides a predicate factory that can create a predicate for each value type supported by Druid. + * See {@link DruidPredicateFactory} for more information. + * + * When creating the ValueMatcher, the ValueMatcherFactory implementation should decide what type of predicate + * to create from the predicate factory based on the ValueType of the specified dimension. + * + * @param columnSelectorFactory Selector for columns. + * @param columnName The column to filter. + * @param predicateFactory Predicate factory + * + * @return An object that applies a predicate to row values + */ + public static ValueMatcher makeValueMatcher( + final ColumnSelectorFactory columnSelectorFactory, + final String columnName, + final DruidPredicateFactory predicateFactory + ) + { + final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(columnName); + + // This should be folded into the ValueMatcherColumnSelectorStrategy once that can handle LONG typed columns. + if (capabilities != null && capabilities.getType() == ValueType.LONG) { + return getLongPredicateMatcher( + columnSelectorFactory.makeLongColumnSelector(columnName), + predicateFactory.makeLongPredicate() + ); + } + + final ColumnSelectorPlus selector = + DimensionHandlerUtils.createColumnSelectorPlus( + ValueMatcherColumnSelectorStrategyFactory.instance(), + DefaultDimensionSpec.of(columnName), + columnSelectorFactory + ); + + return selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(), predicateFactory); + } + public static ImmutableBitmap allFalse(final BitmapIndexSelector selector) { return selector.getBitmapFactory().makeEmptyImmutableBitmap(); diff --git a/processing/src/main/java/io/druid/segment/filter/InFilter.java b/processing/src/main/java/io/druid/segment/filter/InFilter.java index f650260b024..650b1b8b879 100644 --- a/processing/src/main/java/io/druid/segment/filter/InFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/InFilter.java @@ -31,7 +31,7 @@ import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; import java.util.Set; @@ -83,9 +83,9 @@ public class InFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - return factory.makeValueMatcher(dimension, getPredicateFactory()); + return Filters.makeValueMatcher(factory, dimension, getPredicateFactory()); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java b/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java index 85084e82f44..df22ebaea40 100644 --- a/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/JavaScriptFilter.java @@ -25,7 +25,7 @@ import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.JavaScriptDimFilter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; import org.mozilla.javascript.Context; public class JavaScriptFilter implements Filter @@ -64,10 +64,10 @@ public class JavaScriptFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { // suboptimal, since we need create one context per call to predicate.apply() - return factory.makeValueMatcher(dimension, predicateFactory); + return Filters.makeValueMatcher(factory, dimension, predicateFactory); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/LikeFilter.java b/processing/src/main/java/io/druid/segment/filter/LikeFilter.java index 00ba9472854..934b48929e1 100644 --- a/processing/src/main/java/io/druid/segment/filter/LikeFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/LikeFilter.java @@ -20,14 +20,13 @@ package io.druid.segment.filter; import com.google.common.base.Strings; - import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.query.extraction.ExtractionFn; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.LikeDimFilter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.column.BitmapIndex; import io.druid.segment.data.Indexed; @@ -131,9 +130,9 @@ public class LikeFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - return factory.makeValueMatcher(dimension, likeMatcher.predicateFactory(extractionFn)); + return Filters.makeValueMatcher(factory, dimension, likeMatcher.predicateFactory(extractionFn)); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/NotFilter.java b/processing/src/main/java/io/druid/segment/filter/NotFilter.java index 0e3c7311824..acd51a32a8e 100644 --- a/processing/src/main/java/io/druid/segment/filter/NotFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/NotFilter.java @@ -23,7 +23,7 @@ import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; /** */ @@ -48,7 +48,7 @@ public class NotFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { final ValueMatcher baseMatcher = baseFilter.makeMatcher(factory); diff --git a/processing/src/main/java/io/druid/segment/filter/OrFilter.java b/processing/src/main/java/io/druid/segment/filter/OrFilter.java index ba288b38164..f870a5cb8ab 100644 --- a/processing/src/main/java/io/druid/segment/filter/OrFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/OrFilter.java @@ -27,7 +27,7 @@ import io.druid.query.filter.BooleanFilter; import io.druid.query.filter.Filter; import io.druid.query.filter.RowOffsetMatcherFactory; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; import java.util.ArrayList; import java.util.List; @@ -67,7 +67,7 @@ public class OrFilter implements BooleanFilter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { final ValueMatcher[] matchers = new ValueMatcher[filters.size()]; @@ -80,7 +80,7 @@ public class OrFilter implements BooleanFilter @Override public ValueMatcher makeMatcher( BitmapIndexSelector selector, - ValueMatcherFactory valueMatcherFactory, + ColumnSelectorFactory columnSelectorFactory, RowOffsetMatcherFactory rowOffsetMatcherFactory ) { @@ -91,7 +91,7 @@ public class OrFilter implements BooleanFilter if (filter.supportsBitmapIndex(selector)) { bitmaps.add(filter.getBitmapIndex(selector)); } else { - ValueMatcher matcher = filter.makeMatcher(valueMatcherFactory); + ValueMatcher matcher = filter.makeMatcher(columnSelectorFactory); matchers.add(matcher); } } diff --git a/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java b/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java index 77b98fd40b6..9c3654d282b 100644 --- a/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/SelectorFilter.java @@ -23,7 +23,7 @@ import io.druid.collections.bitmap.ImmutableBitmap; import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; /** */ @@ -48,9 +48,9 @@ public class SelectorFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - return factory.makeValueMatcher(dimension, value); + return Filters.makeValueMatcher(factory, dimension, value); } @Override diff --git a/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java b/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java index ff28649aac8..dd5d530971d 100644 --- a/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java +++ b/processing/src/main/java/io/druid/segment/filter/SpatialFilter.java @@ -27,7 +27,7 @@ import io.druid.query.filter.DruidLongPredicate; import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; +import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.incremental.SpatialDimensionRowTransformer; /** @@ -54,9 +54,10 @@ public class SpatialFilter implements Filter } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { - return factory.makeValueMatcher( + return Filters.makeValueMatcher( + factory, dimension, new DruidPredicateFactory() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 0b5210febfa..3660099c300 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -38,15 +38,12 @@ import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.SpatialDimensionSchema; import io.druid.granularity.QueryGranularity; -import io.druid.math.expr.Evals; -import io.druid.math.expr.Expr; -import io.druid.math.expr.Parser; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.dimension.DimensionSpec; -import io.druid.query.extraction.ExtractionFn; +import io.druid.query.groupby.RowBasedColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.DimensionHandler; import io.druid.segment.DimensionHandlerUtils; @@ -61,19 +58,15 @@ import io.druid.segment.column.Column; import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.column.ValueType; -import io.druid.segment.data.IndexedInts; import io.druid.segment.serde.ComplexMetricExtractor; import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; -import it.unimi.dsi.fastutil.ints.IntIterator; -import it.unimi.dsi.fastutil.ints.IntIterators; import org.joda.time.DateTime; import org.joda.time.Interval; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; -import java.io.IOException; import java.lang.reflect.Array; import java.util.ArrayList; import java.util.Arrays; @@ -111,46 +104,22 @@ public abstract class IncrementalIndex implements Iterable, public static ColumnSelectorFactory makeColumnSelectorFactory( final AggregatorFactory agg, final Supplier in, - final boolean deserializeComplexMetrics, - final Map columnCapabilities + final boolean deserializeComplexMetrics ) { + final RowBasedColumnSelectorFactory baseSelectorFactory = RowBasedColumnSelectorFactory.create(in, null); return new ColumnSelectorFactory() { @Override public LongColumnSelector makeLongColumnSelector(final String columnName) { - if (columnName.equals(Column.TIME_COLUMN_NAME)) { - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getTimestampFromEpoch(); - } - }; - } - return new LongColumnSelector() - { - @Override - public long get() - { - return in.get().getLongMetric(columnName); - } - }; + return baseSelectorFactory.makeLongColumnSelector(columnName); } @Override public FloatColumnSelector makeFloatColumnSelector(final String columnName) { - return new FloatColumnSelector() - { - @Override - public float get() - { - return in.get().getFloatMetric(columnName); - } - }; + return baseSelectorFactory.makeFloatColumnSelector(columnName); } @Override @@ -158,20 +127,7 @@ public abstract class IncrementalIndex implements Iterable, { final String typeName = agg.getTypeName(); - final ObjectColumnSelector rawColumnSelector = new ObjectColumnSelector() - { - @Override - public Class classOfObject() - { - return Object.class; - } - - @Override - public Object get() - { - return in.get().getRaw(column); - } - }; + final ObjectColumnSelector rawColumnSelector = baseSelectorFactory.makeObjectColumnSelector(column); if ((Enums.getIfPresent(ValueType.class, typeName.toUpperCase()).isPresent() && !typeName.equalsIgnoreCase(ValueType.COMPLEX.name())) || !deserializeComplexMetrics) { @@ -201,129 +157,21 @@ public abstract class IncrementalIndex implements Iterable, } @Override - public ColumnCapabilities getColumnCapabilities(String columnName) + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) { - // This ColumnSelectorFactory implementation has no knowledge of column capabilities. - // However, this method may still be called by FilteredAggregatorFactory's ValueMatcherFactory - // to check column types. - // If column capabilities are not available, return null, the caller will assume default types in that case. - return columnCapabilities == null ? null : columnCapabilities.get(columnName); + return baseSelectorFactory.makeDimensionSelector(dimensionSpec); } @Override - public DimensionSelector makeDimensionSelector( - DimensionSpec dimensionSpec - ) + public ColumnCapabilities getColumnCapabilities(String columnName) { - return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec)); - } - - private DimensionSelector makeDimensionSelectorUndecorated( - DimensionSpec dimensionSpec - ) - { - final String dimension = dimensionSpec.getDimension(); - final ExtractionFn extractionFn = dimensionSpec.getExtractionFn(); - - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - final List dimensionValues = in.get().getDimension(dimension); - final int dimensionValuesSize = dimensionValues != null ? dimensionValues.size() : 0; - - return new IndexedInts() - { - @Override - public int size() - { - return dimensionValuesSize; - } - - @Override - public int get(int index) - { - if (index < 0 || index >= dimensionValuesSize) { - throw new IndexOutOfBoundsException("index: " + index); - } - return index; - } - - @Override - public IntIterator iterator() - { - return IntIterators.fromTo(0, dimensionValuesSize); - } - - @Override - public void close() throws IOException - { - - } - - @Override - public void fill(int index, int[] toFill) - { - throw new UnsupportedOperationException("fill not supported"); - } - }; - } - - @Override - public int getValueCardinality() - { - return DimensionSelector.CARDINALITY_UNKNOWN; - } - - @Override - public String lookupName(int id) - { - final String value = in.get().getDimension(dimension).get(id); - return extractionFn == null ? value : extractionFn.apply(value); - } - - @Override - public int lookupId(String name) - { - if (extractionFn != null) { - throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function"); - } - return in.get().getDimension(dimension).indexOf(name); - } - }; + return baseSelectorFactory.getColumnCapabilities(columnName); } @Override public NumericColumnSelector makeMathExpressionSelector(String expression) { - final Expr parsed = Parser.parse(expression); - - final List required = Parser.findRequiredBindings(parsed); - final Map> values = Maps.newHashMapWithExpectedSize(required.size()); - - for (final String columnName : required) { - values.put( - columnName, new Supplier() - { - @Override - public Number get() - { - return Evals.toNumber(in.get().getRaw(columnName)); - } - } - ); - } - final Expr.ObjectBinding binding = Parser.withSuppliers(values); - - return new NumericColumnSelector() - { - @Override - public Number get() - { - return parsed.eval(binding).numericValue(); - } - }; + return baseSelectorFactory.makeMathExpressionSelector(expression); } }; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 443abed638a..f52739356f5 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -20,27 +20,22 @@ package io.druid.segment.incremental; import com.google.common.base.Function; -import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; - import io.druid.granularity.QueryGranularity; -import io.druid.math.expr.Expr; -import io.druid.math.expr.Parser; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; +import io.druid.math.expr.Expr; +import io.druid.math.expr.Parser; import io.druid.query.QueryInterruptedException; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; import io.druid.query.extraction.ExtractionFn; -import io.druid.query.filter.DruidLongPredicate; -import io.druid.query.filter.DruidPredicateFactory; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; import io.druid.segment.Capabilities; import io.druid.segment.Cursor; import io.druid.segment.DimensionHandler; @@ -63,11 +58,9 @@ import io.druid.segment.column.ValueType; import io.druid.segment.data.Indexed; import io.druid.segment.data.ListIndexed; import io.druid.segment.filter.BooleanValueMatcher; -import io.druid.segment.filter.Filters; import org.joda.time.DateTime; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -245,13 +238,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter EntryHolder currEntry = new EntryHolder(); @Override - public Cursor apply(@Nullable final Long input) + public Cursor apply(final Long input) { final long timeStart = Math.max(input, actualInterval.getStartMillis()); return new Cursor() { - private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this, currEntry); + private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this); private Iterator> baseIter; private Iterable> cursorIterable; private boolean emptyRange; @@ -615,11 +608,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter ); } - private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor, final EntryHolder holder) + private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor) { return filter == null ? new BooleanValueMatcher(true) - : filter.makeMatcher(new CursorAndEntryHolderValueMatcherFactory(cursor, holder)); + : filter.makeMatcher(cursor); } public static class EntryHolder @@ -647,84 +640,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } } - - private class CursorAndEntryHolderValueMatcherFactory implements ValueMatcherFactory - { - private final EntryHolder holder; - private final Cursor cursor; - - public CursorAndEntryHolderValueMatcherFactory( - Cursor cursor, - EntryHolder holder - ) - { - this.cursor = cursor; - this.holder = holder; - } - - @Override - public ValueMatcher makeValueMatcher(String dimension, final String originalValue) - { - IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); - if (dimensionDesc == null) { - // filtering on long metrics and __time is supported as well - final Integer metricIndexInt = index.getMetricIndex(dimension); - if (metricIndexInt != null || dimension.equals(Column.TIME_COLUMN_NAME)) { - ValueType type = getTypeForDimension(dimension); - switch (type) { - case LONG: - return Filters.getLongValueMatcher(cursor.makeLongColumnSelector(dimension), originalValue); - default: - return new BooleanValueMatcher(Strings.isNullOrEmpty(originalValue)); - } - } else { - return new BooleanValueMatcher(Strings.isNullOrEmpty(originalValue)); - } - } else { - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - final int dimIndex = dimensionDesc.getIndex(); - return indexer.makeIndexingValueMatcher(originalValue, holder, dimIndex); - } - } - - @Override - public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory) - { - IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); - if (dimensionDesc == null) { - // filtering on long metrics and __time is supported as well - final Integer metricIndexInt = index.getMetricIndex(dimension); - if (metricIndexInt != null || dimension.equals(Column.TIME_COLUMN_NAME)) { - ValueType type = getTypeForDimension(dimension); - switch (type) { - case LONG: - return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate()); - default: - return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null)); - } - } else { - return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null)); - } - } else { - final DimensionIndexer indexer = dimensionDesc.getIndexer(); - final int dimIndex = dimensionDesc.getIndex(); - return indexer.makeIndexingValueMatcher(predicateFactory, holder, dimIndex); - } - } - - // for long metrics and __time - private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate) - { - return Filters.getLongPredicateMatcher(cursor.makeLongColumnSelector(dimension), predicate); - } - - private ValueType getTypeForDimension(String dimension) - { - ColumnCapabilities capabilities = index.getCapabilities(dimension); - return capabilities == null ? ValueType.STRING : capabilities.getType(); - } - } - @Override public Metadata getMetadata() { diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index a0f654b4051..2100e8ff0df 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -177,8 +177,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory( agg, rowSupplier, - deserializeComplexMetrics, - getColumnCapabilities() + deserializeComplexMetrics ); selectors.put( @@ -229,7 +228,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; getAggs()[i] = agg.factorizeBuffered( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, getColumnCapabilities()) + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) ); } rowContainer.set(null); diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 4604b9ce8e0..a278ec19bf6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -158,7 +158,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex for (AggregatorFactory agg : metrics) { selectors.put( agg.getName(), - new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, getColumnCapabilities())) + new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) ); } diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index d00ea9ca573..862405a45cf 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -91,6 +91,7 @@ import io.druid.query.filter.OrDimFilter; import io.druid.query.filter.RegexDimFilter; import io.druid.query.filter.SearchQueryDimFilter; import io.druid.query.filter.SelectorDimFilter; +import io.druid.query.groupby.having.BaseHavingSpec; import io.druid.query.groupby.having.DimFilterHavingSpec; import io.druid.query.groupby.having.DimensionSelectorHavingSpec; import io.druid.query.groupby.having.EqualToHavingSpec; @@ -3366,7 +3367,7 @@ public class GroupByQueryRunnerTest .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) .setHavingSpec( new OrHavingSpec( - ImmutableList.of( + ImmutableList.of( new GreaterThanHavingSpec("rows", 2L), new EqualToHavingSpec("idx", 217L) ) @@ -3495,7 +3496,7 @@ public class GroupByQueryRunnerTest .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) .setHavingSpec( new OrHavingSpec( - ImmutableList.of( + ImmutableList.of( new GreaterThanHavingSpec("rows", 2L), new EqualToHavingSpec("idx", 217L) ) @@ -3604,7 +3605,7 @@ public class GroupByQueryRunnerTest .setGranularity(new PeriodGranularity(new Period("P1M"), null, null)) .setHavingSpec( new OrHavingSpec( - ImmutableList.of( + ImmutableList.of( new GreaterThanHavingSpec("rows_times_10", 20L), new EqualToHavingSpec("idx", 217L) ) @@ -4655,7 +4656,7 @@ public class GroupByQueryRunnerTest ) ) .setHavingSpec( - new HavingSpec() + new BaseHavingSpec() { @Override public boolean eval(Row row) @@ -4926,7 +4927,7 @@ public class GroupByQueryRunnerTest ) ) .setHavingSpec( - new HavingSpec() + new BaseHavingSpec() { @Override public boolean eval(Row row) diff --git a/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java b/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java new file mode 100644 index 00000000000..89ea9461d0a --- /dev/null +++ b/processing/src/test/java/io/druid/query/groupby/having/DimFilterHavingSpecTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.groupby.having; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.MapBasedRow; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.filter.SelectorDimFilter; +import io.druid.segment.column.ValueType; +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; +import org.junit.rules.ExpectedException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class DimFilterHavingSpecTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testSimple() + { + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "bar", null)); + havingSpec.setRowSignature(null); + + Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.of("foo", "bar")))); + Assert.assertFalse(havingSpec.eval(new MapBasedRow(0, ImmutableMap.of("foo", "baz")))); + } + + @Test + public void testRowSignature() + { + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null)); + havingSpec.setRowSignature(ImmutableMap.of("foo", ValueType.LONG)); + + Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.of("foo", 1L)))); + Assert.assertFalse(havingSpec.eval(new MapBasedRow(0, ImmutableMap.of("foo", 2L)))); + } + + @Test(timeout = 60_000L) + public void testConcurrentUsage() throws Exception + { + final ExecutorService exec = Executors.newFixedThreadPool(2); + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null)); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < 2; i++) { + final MapBasedRow row = new MapBasedRow(0, ImmutableMap.of("foo", String.valueOf(i))); + futures.add( + exec.submit( + new Runnable() + { + @Override + public void run() + { + havingSpec.setRowSignature(null); + while (!Thread.interrupted()) { + havingSpec.eval(row); + } + } + } + ) + ); + } + + expectedException.expect(ExecutionException.class); + expectedException.expectCause(CoreMatchers.instanceOf(IllegalStateException.class)); + expectedException.expectCause( + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("concurrent 'eval' calls not permitted")) + ); + + try { + for (Future future : futures) { + future.get(); + } + } + finally { + exec.shutdownNow(); + } + + // Not reached + Assert.assertTrue(false); + } + + @Test + public void testSerde() throws Exception + { + final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null)); + final ObjectMapper objectMapper = new DefaultObjectMapper(); + Assert.assertEquals( + havingSpec, + objectMapper.readValue(objectMapper.writeValueAsBytes(havingSpec), HavingSpec.class) + ); + } +} diff --git a/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java b/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java index acf872bbe88..044596937ad 100644 --- a/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java +++ b/processing/src/test/java/io/druid/query/groupby/having/HavingSpecTest.java @@ -44,10 +44,10 @@ public class HavingSpecTest @Test public void testHavingClauseSerde() throws Exception { - List havings = Arrays.asList( + List havings = Arrays.asList( new GreaterThanHavingSpec("agg", Double.valueOf(1.3)), new OrHavingSpec( - Arrays.asList( + Arrays.asList( new LessThanHavingSpec("lessAgg", Long.valueOf(1L)), new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2))) ) @@ -152,7 +152,7 @@ public class HavingSpecTest assertFalse(spec.eval(getTestRow(Long.MAX_VALUE))); } - private static class CountingHavingSpec implements HavingSpec { + private static class CountingHavingSpec extends BaseHavingSpec { private final AtomicInteger counter; private final boolean value; diff --git a/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java b/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java index 76e192848dd..97916b253c2 100644 --- a/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/BaseFilterTest.java @@ -23,8 +23,11 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import io.druid.collections.bitmap.ImmutableBitmap; +import io.druid.common.guava.SettableSupplier; import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; import io.druid.granularity.QueryGranularities; @@ -39,8 +42,8 @@ import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.DimFilter; import io.druid.query.filter.Filter; import io.druid.query.filter.ValueMatcher; -import io.druid.query.filter.ValueMatcherFactory; -import io.druid.query.groupby.RowBasedValueMatcherFactory; +import io.druid.query.groupby.RowBasedColumnSelectorFactory; +import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; import io.druid.segment.IndexBuilder; @@ -51,6 +54,7 @@ import io.druid.segment.QueryableIndexStorageAdapter; import io.druid.segment.StorageAdapter; import io.druid.segment.TestHelper; import io.druid.segment.VirtualColumns; +import io.druid.segment.column.ValueType; import io.druid.segment.data.BitmapSerdeFactory; import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.data.IndexedInts; @@ -371,7 +375,7 @@ public abstract class BaseFilterTest } @Override - public ValueMatcher makeMatcher(ValueMatcherFactory factory) + public ValueMatcher makeMatcher(ColumnSelectorFactory factory) { return theFilter.makeMatcher(factory); } @@ -411,16 +415,25 @@ public abstract class BaseFilterTest return Sequences.toList(seq, new ArrayList>()).get(0); } - private List selectColumnValuesMatchingFilterUsingRowBasedValueMatcherFactory( + private List selectColumnValuesMatchingFilterUsingRowBasedColumnSelectorFactory( final DimFilter filter, final String selectColumn ) { - final RowBasedValueMatcherFactory matcherFactory = new RowBasedValueMatcherFactory(); - final ValueMatcher matcher = makeFilter(filter).makeMatcher(matcherFactory); + // Generate rowType + final Map rowSignature = Maps.newHashMap(); + for (String columnName : Iterables.concat(adapter.getAvailableDimensions(), adapter.getAvailableMetrics())) { + rowSignature.put(columnName, adapter.getColumnCapabilities(columnName).getType()); + } + + // Perform test + final SettableSupplier rowSupplier = new SettableSupplier<>(); + final ValueMatcher matcher = makeFilter(filter).makeMatcher( + RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature) + ); final List values = Lists.newArrayList(); for (InputRow row : rows) { - matcherFactory.setRow(row); + rowSupplier.set(row); if (matcher.matches()) { values.add((String) row.getRaw(selectColumn)); } @@ -449,9 +462,9 @@ public abstract class BaseFilterTest selectCountUsingFilteredAggregator(filter) ); Assert.assertEquals( - "RowBasedValueMatcherFactory: " + filter.toString(), + "RowBasedColumnSelectorFactory: " + filter.toString(), expectedRows, - selectColumnValuesMatchingFilterUsingRowBasedValueMatcherFactory(filter, "dim0") + selectColumnValuesMatchingFilterUsingRowBasedColumnSelectorFactory(filter, "dim0") ); } } diff --git a/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java b/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java index 754aa37c508..39771a04273 100644 --- a/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java +++ b/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java @@ -206,7 +206,7 @@ public class LongFilteringTest extends BaseFilterTest assertFilterMatches( new BoundDimFilter(LONG_COLUMN, " ", "4", false, false, null, null, StringComparators.NUMERIC), - ImmutableList.of() + ImmutableList.of("1", "2", "3", "4") ); assertFilterMatches( diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index d7e89b33726..cc19f3e30f1 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -158,7 +158,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( - makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, null) + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) ); } Integer rowIndex;