mirror of https://github.com/apache/druid.git
Filters: Use ColumnSelectorFactory directly for building row-based matchers. (#3797)
* Filters: Use ColumnSelectorFactory directly for building row-based matchers. * Adjustments based on code review. - BoundDimFilter: fewer volatiles, rename matchesAnything to !matchesNothing. - HavingSpecs: Clarify that they are not thread-safe, and make DimFilterHavingSpec not thread safe. - Renamed rowType to rowSignature. - Added specializations for time-based vs non-time-based DimensionSelector in RBCSF. - Added convenience method DimensionHanderUtils.createColumnSelectorPlus. - Added singleton ZeroIndexedInts. - Added test cases for DimFilterHavingSpec. * Make ValueMatcherColumnSelectorStrategy actually use the associated selector. * Add RangeIndexedInts. * DimFilterHavingSpec: Fix concurrent usage guard on jdk7. * Add assertion to ZeroIndexedInts. * Rename no-longer-volatile members.
This commit is contained in:
parent
80e2394f94
commit
d8702ebece
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.common.guava;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
/**
|
||||
* A settable Supplier. Not thread safe.
|
||||
*/
|
||||
public class SettableSupplier<T> implements Supplier<T>
|
||||
{
|
||||
private T obj;
|
||||
|
||||
public SettableSupplier()
|
||||
{
|
||||
}
|
||||
|
||||
public SettableSupplier(T initialValue)
|
||||
{
|
||||
obj = initialValue;
|
||||
}
|
||||
|
||||
public void set(T obj)
|
||||
{
|
||||
this.obj = obj;
|
||||
}
|
||||
|
||||
@Override
|
||||
public T get()
|
||||
{
|
||||
return obj;
|
||||
}
|
||||
}
|
|
@ -89,8 +89,7 @@ public class InputRowSerde
|
|||
IncrementalIndex.makeColumnSelectorFactory(
|
||||
aggFactory,
|
||||
supplier,
|
||||
true,
|
||||
null
|
||||
true
|
||||
)
|
||||
);
|
||||
try {
|
||||
|
|
|
@ -21,23 +21,9 @@ package io.druid.query.aggregation;
|
|||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
|
||||
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -66,8 +52,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
final ValueMatcherFactory valueMatcherFactory = new FilteredAggregatorValueMatcherFactory(columnSelectorFactory);
|
||||
final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(valueMatcherFactory);
|
||||
final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(columnSelectorFactory);
|
||||
return new FilteredAggregator(
|
||||
valueMatcher,
|
||||
delegate.factorize(columnSelectorFactory)
|
||||
|
@ -77,8 +62,7 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
final ValueMatcherFactory valueMatcherFactory = new FilteredAggregatorValueMatcherFactory(columnSelectorFactory);
|
||||
final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(valueMatcherFactory);
|
||||
final ValueMatcher valueMatcher = Filters.toFilter(filter).makeMatcher(columnSelectorFactory);
|
||||
return new FilteredBufferAggregator(
|
||||
valueMatcher,
|
||||
delegate.factorizeBuffered(columnSelectorFactory)
|
||||
|
@ -208,81 +192,4 @@ public class FilteredAggregatorFactory extends AggregatorFactory
|
|||
result = 31 * result + (filter != null ? filter.hashCode() : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
private static class FilteredAggregatorValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY =
|
||||
new ValueMatcherColumnSelectorStrategyFactory();
|
||||
|
||||
private final ColumnSelectorFactory columnSelectorFactory;
|
||||
|
||||
public FilteredAggregatorValueMatcherFactory(ColumnSelectorFactory columnSelectorFactory)
|
||||
{
|
||||
this.columnSelectorFactory = columnSelectorFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final String value)
|
||||
{
|
||||
if (getTypeForDimension(dimension) == ValueType.LONG) {
|
||||
return Filters.getLongValueMatcher(
|
||||
columnSelectorFactory.makeLongColumnSelector(dimension),
|
||||
value
|
||||
);
|
||||
}
|
||||
|
||||
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
|
||||
DimensionHandlerUtils.createColumnSelectorPluses(
|
||||
STRATEGY_FACTORY,
|
||||
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
|
||||
columnSelectorFactory
|
||||
);
|
||||
|
||||
|
||||
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
|
||||
return strategy.getValueMatcher(dimension, columnSelectorFactory, value);
|
||||
}
|
||||
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
ValueType type = getTypeForDimension(dimension);
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
|
||||
case STRING:
|
||||
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
|
||||
DimensionHandlerUtils.createColumnSelectorPluses(
|
||||
STRATEGY_FACTORY,
|
||||
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
|
||||
columnSelectorFactory
|
||||
);
|
||||
|
||||
|
||||
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
|
||||
return strategy.getValueMatcher(dimension, columnSelectorFactory, predicateFactory);
|
||||
default:
|
||||
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
}
|
||||
|
||||
private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate)
|
||||
{
|
||||
return Filters.getLongPredicateMatcher(
|
||||
columnSelectorFactory.makeLongColumnSelector(dimension),
|
||||
predicate
|
||||
);
|
||||
}
|
||||
|
||||
private ValueType getTypeForDimension(String dimension)
|
||||
{
|
||||
// FilteredAggregatorFactory is sometimes created from a ColumnSelectorFactory that
|
||||
// has no knowledge of column capabilities/types.
|
||||
// Default to LONG for __time, STRING for everything else.
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return ValueType.LONG;
|
||||
}
|
||||
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimension);
|
||||
return capabilities == null ? ValueType.STRING : capabilities.getType();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package io.druid.query.filter;
|
||||
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface BooleanFilter extends Filter
|
||||
|
@ -33,16 +35,17 @@ public interface BooleanFilter extends Filter
|
|||
* An implementation should either:
|
||||
* - return a ValueMatcher that checks row values, using the provided ValueMatcherFactory
|
||||
* - or, if possible, get a bitmap index for this filter using the BitmapIndexSelector, and
|
||||
* return a ValueMatcher that checks the current row offset, created using the bitmap index.
|
||||
* return a ValueMatcher that checks the current row offset, created using the bitmap index.
|
||||
*
|
||||
* @param selector Object used to retrieve bitmap indexes
|
||||
* @param valueMatcherFactory Object used to create ValueMatchers
|
||||
* @param selector Object used to retrieve bitmap indexes
|
||||
* @param columnSelectorFactory Object used to select columns for making ValueMatchers
|
||||
* @param rowOffsetMatcherFactory Object used to create RowOffsetMatchers
|
||||
*
|
||||
* @return ValueMatcher that applies this filter
|
||||
*/
|
||||
public ValueMatcher makeMatcher(
|
||||
BitmapIndexSelector selector,
|
||||
ValueMatcherFactory valueMatcherFactory,
|
||||
ColumnSelectorFactory columnSelectorFactory,
|
||||
RowOffsetMatcherFactory rowOffsetMatcherFactory
|
||||
);
|
||||
}
|
||||
|
|
|
@ -314,22 +314,27 @@ public class BoundDimFilter implements DimFilter
|
|||
|
||||
private Supplier<DruidLongPredicate> makeLongPredicateSupplier()
|
||||
{
|
||||
return new Supplier<DruidLongPredicate>()
|
||||
class BoundLongPredicateSupplier implements Supplier<DruidLongPredicate>
|
||||
{
|
||||
private final Object initLock = new Object();
|
||||
|
||||
// longsInitialized is volatile since it establishes the happens-before relationship on
|
||||
// writes/reads to the rest of the fields (it's written last and read first).
|
||||
private volatile boolean longsInitialized = false;
|
||||
private volatile boolean matchesAnything = true;
|
||||
private volatile boolean hasLowerLongBoundVolatile;
|
||||
private volatile boolean hasUpperLongBoundVolatile;
|
||||
private volatile long lowerLongBoundVolatile;
|
||||
private volatile long upperLongBoundVolatile;
|
||||
|
||||
// Other fields are not volatile.
|
||||
private boolean matchesNothing;
|
||||
private boolean hasLowerLongBound;
|
||||
private boolean hasUpperLongBound;
|
||||
private long lowerLongBound;
|
||||
private long upperLongBound;
|
||||
|
||||
@Override
|
||||
public DruidLongPredicate get()
|
||||
{
|
||||
initLongData();
|
||||
|
||||
if (!matchesAnything) {
|
||||
if (matchesNothing) {
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
@Override
|
||||
|
@ -342,10 +347,10 @@ public class BoundDimFilter implements DimFilter
|
|||
|
||||
return new DruidLongPredicate()
|
||||
{
|
||||
private final boolean hasLowerLongBound = hasLowerLongBoundVolatile;
|
||||
private final boolean hasUpperLongBound = hasUpperLongBoundVolatile;
|
||||
private final long lowerLongBound = hasLowerLongBound ? lowerLongBoundVolatile : 0L;
|
||||
private final long upperLongBound = hasUpperLongBound ? upperLongBoundVolatile : 0L;
|
||||
private final boolean hasLowerLongBound = BoundLongPredicateSupplier.this.hasLowerLongBound;
|
||||
private final boolean hasUpperLongBound = BoundLongPredicateSupplier.this.hasUpperLongBound;
|
||||
private final long lowerLongBound = hasLowerLongBound ? BoundLongPredicateSupplier.this.lowerLongBound : 0L;
|
||||
private final long upperLongBound = hasUpperLongBound ? BoundLongPredicateSupplier.this.upperLongBound : 0L;
|
||||
|
||||
@Override
|
||||
public boolean applyLong(long input)
|
||||
|
@ -382,35 +387,39 @@ public class BoundDimFilter implements DimFilter
|
|||
return;
|
||||
}
|
||||
|
||||
matchesNothing = false;
|
||||
|
||||
if (hasLowerBound()) {
|
||||
final Long lowerLong = GuavaUtils.tryParseLong(lower);
|
||||
if (lowerLong == null) {
|
||||
matchesAnything = false;
|
||||
return;
|
||||
// Unparseable values fall before all actual numbers, so all numbers will match the lower bound.
|
||||
hasLowerLongBound = false;
|
||||
} else {
|
||||
hasLowerLongBound = true;
|
||||
lowerLongBound = lowerLong;
|
||||
}
|
||||
|
||||
hasLowerLongBoundVolatile = true;
|
||||
lowerLongBoundVolatile = lowerLong;
|
||||
} else {
|
||||
hasLowerLongBoundVolatile = false;
|
||||
hasLowerLongBound = false;
|
||||
}
|
||||
|
||||
if (hasUpperBound()) {
|
||||
Long upperLong = GuavaUtils.tryParseLong(upper);
|
||||
if (upperLong == null) {
|
||||
matchesAnything = false;
|
||||
// Unparseable values fall before all actual numbers, so no numbers can match the upper bound.
|
||||
matchesNothing = true;
|
||||
return;
|
||||
}
|
||||
|
||||
hasUpperLongBoundVolatile = true;
|
||||
upperLongBoundVolatile = upperLong;
|
||||
hasUpperLongBound = true;
|
||||
upperLongBound = upperLong;
|
||||
} else {
|
||||
hasUpperLongBoundVolatile = false;
|
||||
hasUpperLongBound = false;
|
||||
}
|
||||
|
||||
longsInitialized = true;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
return new BoundLongPredicateSupplier();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.filter;
|
||||
|
||||
import io.druid.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -40,7 +41,7 @@ public interface Filter
|
|||
* @param factory Object used to create ValueMatchers
|
||||
* @return ValueMatcher that applies this filter to row values.
|
||||
*/
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory);
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory);
|
||||
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,30 +21,29 @@ package io.druid.query.filter;
|
|||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
|
||||
import java.util.BitSet;
|
||||
import java.util.Objects;
|
||||
|
||||
public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherColumnSelectorStrategy
|
||||
public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherColumnSelectorStrategy<DimensionSelector>
|
||||
{
|
||||
@Override
|
||||
public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final String value)
|
||||
public ValueMatcher makeValueMatcher(final DimensionSelector selector, final String value)
|
||||
{
|
||||
final String valueStr = Strings.emptyToNull(value);
|
||||
final DimensionSelector selector = cursor.makeDimensionSelector(
|
||||
new DefaultDimensionSpec(columnName, columnName)
|
||||
);
|
||||
|
||||
// if matching against null, rows with size 0 should also match
|
||||
final boolean matchNull = Strings.isNullOrEmpty(valueStr);
|
||||
|
||||
final int cardinality = selector.getValueCardinality();
|
||||
|
||||
if (cardinality >= 0) {
|
||||
if (cardinality == 0 || (cardinality == 1 && selector.lookupName(0) == null)) {
|
||||
// All values are null or empty rows (which match nulls anyway). No need to check each row.
|
||||
return new BooleanValueMatcher(matchNull);
|
||||
} else if (cardinality >= 0) {
|
||||
// Dictionary-encoded dimension. Compare by id instead of by value to save time.
|
||||
final int valueId = selector.lookupId(valueStr);
|
||||
|
||||
|
@ -94,17 +93,19 @@ public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherCol
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory)
|
||||
public ValueMatcher makeValueMatcher(
|
||||
final DimensionSelector selector,
|
||||
final DruidPredicateFactory predicateFactory
|
||||
)
|
||||
{
|
||||
final DimensionSelector selector = cursor.makeDimensionSelector(
|
||||
new DefaultDimensionSpec(columnName, columnName)
|
||||
);
|
||||
|
||||
final Predicate<String> predicate = predicateFactory.makeStringPredicate();
|
||||
final int cardinality = selector.getValueCardinality();
|
||||
final boolean matchNull = predicate.apply(null);
|
||||
|
||||
if (cardinality >= 0) {
|
||||
if (cardinality == 0 || (cardinality == 1 && selector.lookupName(0) == null)) {
|
||||
// All values are null or empty rows (which match nulls anyway). No need to check each row.
|
||||
return new BooleanValueMatcher(matchNull);
|
||||
} else if (cardinality >= 0) {
|
||||
// Dictionary-encoded dimension. Check every value; build a bitset of matching ids.
|
||||
final BitSet valueIds = new BitSet(cardinality);
|
||||
for (int i = 0; i < cardinality; i++) {
|
||||
|
|
|
@ -20,26 +20,25 @@
|
|||
package io.druid.query.filter;
|
||||
|
||||
import io.druid.query.dimension.ColumnSelectorStrategy;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.ColumnValueSelector;
|
||||
|
||||
public interface ValueMatcherColumnSelectorStrategy extends ColumnSelectorStrategy
|
||||
public interface ValueMatcherColumnSelectorStrategy<ValueSelectorType extends ColumnValueSelector> extends ColumnSelectorStrategy
|
||||
{
|
||||
/**
|
||||
* Create a single value ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory.
|
||||
* Create a single value ValueMatcher.
|
||||
*
|
||||
* @param cursor ColumnSelectorFactory for creating dimension value selectors
|
||||
* @param selector Column selector
|
||||
* @param value Value to match against
|
||||
* @return ValueMatcher that matches on 'value'
|
||||
*/
|
||||
ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, String value);
|
||||
|
||||
ValueMatcher makeValueMatcher(ValueSelectorType selector, String value);
|
||||
|
||||
/**
|
||||
* Create a predicate-based ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory.
|
||||
* Create a predicate-based ValueMatcher.
|
||||
*
|
||||
* @param cursor ColumnSelectorFactory for creating dimension value selectors
|
||||
* @param selector Column selector
|
||||
* @param predicateFactory A DruidPredicateFactory that provides the filter predicates to be matched
|
||||
* @return A ValueMatcher that applies the predicate for this DimensionQueryHelper's value type from the predicateFactory
|
||||
*/
|
||||
ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory);
|
||||
ValueMatcher makeValueMatcher(ValueSelectorType selector, DruidPredicateFactory predicateFactory);
|
||||
}
|
||||
|
|
|
@ -27,6 +27,18 @@ import io.druid.segment.column.ValueType;
|
|||
public class ValueMatcherColumnSelectorStrategyFactory
|
||||
implements ColumnSelectorStrategyFactory<ValueMatcherColumnSelectorStrategy>
|
||||
{
|
||||
private static final ValueMatcherColumnSelectorStrategyFactory INSTANCE = new ValueMatcherColumnSelectorStrategyFactory();
|
||||
|
||||
private ValueMatcherColumnSelectorStrategyFactory()
|
||||
{
|
||||
// Singleton.
|
||||
}
|
||||
|
||||
public static ValueMatcherColumnSelectorStrategyFactory instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcherColumnSelectorStrategy makeColumnSelectorStrategy(
|
||||
ColumnCapabilities capabilities
|
||||
|
|
|
@ -1,63 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.filter;
|
||||
|
||||
/**
|
||||
* A ValueMatcherFactory is an object associated with a collection of rows (e.g., an IncrementalIndexStorageAdapter)
|
||||
* that generates ValueMatchers for filtering on the associated collection of rows.
|
||||
*
|
||||
* A ValueMatcher is an object that decides whether a row value matches a value or predicate
|
||||
* associated with the ValueMatcher.
|
||||
*
|
||||
* The ValueMatcher is expected to track the current row to be matched with a stateful
|
||||
* object (e.g., a ColumnSelectorFactory). The ValueMatcher has no responsibility for moving the current
|
||||
* "row pointer", this is handled outside of the ValueMatcher.
|
||||
*
|
||||
* The ValueMatcherFactory/ValueMatcher classes are used for filtering rows during column scans.
|
||||
*/
|
||||
public interface ValueMatcherFactory
|
||||
{
|
||||
/**
|
||||
* Create a ValueMatcher that compares row values to the provided string.
|
||||
*
|
||||
* An implementation of this method should be able to handle dimensions of various types.
|
||||
*
|
||||
* @param dimension The dimension to filter.
|
||||
* @param value The value to match against, represented as a String.
|
||||
*
|
||||
* @return An object that matches row values on the provided value.
|
||||
*/
|
||||
public ValueMatcher makeValueMatcher(String dimension, String value);
|
||||
|
||||
/**
|
||||
* Create a ValueMatcher that applies a predicate to row values.
|
||||
*
|
||||
* The caller provides a predicate factory that can create a predicate for each value type supported by Druid.
|
||||
* See {@link DruidPredicateFactory} for more information.
|
||||
*
|
||||
* When creating the ValueMatcher, the ValueMatcherFactory implementation should decide what type of predicate
|
||||
* to create from the predicate factory based on the ValueType of the specified dimension.
|
||||
*
|
||||
* @param dimension The dimension to filter.
|
||||
* @param predicateFactory Predicate factory
|
||||
* @return An object that applies a predicate to row values
|
||||
*/
|
||||
public ValueMatcher makeValueMatcher(String dimension, DruidPredicateFactory predicateFactory);
|
||||
}
|
|
@ -143,6 +143,7 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
@Override
|
||||
public Sequence<Row> apply(Sequence<Row> input)
|
||||
{
|
||||
GroupByQuery.this.havingSpec.setRowSignature(GroupByQueryHelper.rowSignatureFor(GroupByQuery.this));
|
||||
return Sequences.filter(
|
||||
input,
|
||||
new Predicate<Row>()
|
||||
|
@ -369,6 +370,15 @@ public class GroupByQuery extends BaseQuery<Row>
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply the havingSpec and limitSpec. Because havingSpecs are not thread safe, and because they are applied during
|
||||
* accumulation of the returned sequence, callers must take care to avoid accumulating two different Sequences
|
||||
* returned by this method in two different threads.
|
||||
*
|
||||
* @param results sequence of rows to apply havingSpec and limitSpec to
|
||||
*
|
||||
* @return sequence of rows after applying havingSpec and limitSpec
|
||||
*/
|
||||
public Sequence<Row> applyLimit(Sequence<Row> results)
|
||||
{
|
||||
return limitFn.apply(results);
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package io.druid.query.groupby;
|
||||
|
||||
import com.google.common.base.Enums;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
|
@ -37,6 +39,7 @@ import io.druid.java.util.common.guava.Sequences;
|
|||
import io.druid.query.ResourceLimitExceededException;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexSchema;
|
||||
import io.druid.segment.incremental.IndexSizeExceededException;
|
||||
|
@ -45,6 +48,7 @@ import io.druid.segment.incremental.OnheapIncrementalIndex;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
|
@ -219,4 +223,34 @@ public class GroupByQueryHelper
|
|||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns types for fields that will appear in the Rows output from "query". Useful for feeding them into
|
||||
* {@link RowBasedColumnSelectorFactory}.
|
||||
*
|
||||
* @param query groupBy query
|
||||
*
|
||||
* @return row types
|
||||
*/
|
||||
public static Map<String, ValueType> rowSignatureFor(final GroupByQuery query)
|
||||
{
|
||||
final ImmutableMap.Builder<String, ValueType> types = ImmutableMap.builder();
|
||||
|
||||
for (DimensionSpec dimensionSpec : query.getDimensions()) {
|
||||
types.put(dimensionSpec.getOutputName(), ValueType.STRING);
|
||||
}
|
||||
|
||||
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
|
||||
final String typeName = aggregatorFactory.getTypeName();
|
||||
final ValueType valueType = typeName != null
|
||||
? Enums.getIfPresent(ValueType.class, typeName.toUpperCase()).orNull()
|
||||
: null;
|
||||
if (valueType != null) {
|
||||
types.put(aggregatorFactory.getName(), valueType);
|
||||
}
|
||||
}
|
||||
|
||||
// Don't include post-aggregators since we don't know what types they are.
|
||||
return types.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,297 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.math.expr.Evals;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.NumericColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.RangeIndexedInts;
|
||||
import io.druid.segment.data.ZeroIndexedInts;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
private final Supplier<? extends Row> row;
|
||||
private final Map<String, ValueType> rowSignature;
|
||||
|
||||
private RowBasedColumnSelectorFactory(
|
||||
final Supplier<? extends Row> row,
|
||||
@Nullable final Map<String, ValueType> rowSignature
|
||||
)
|
||||
{
|
||||
this.row = row;
|
||||
this.rowSignature = rowSignature != null ? rowSignature : ImmutableMap.<String, ValueType>of();
|
||||
}
|
||||
|
||||
public static RowBasedColumnSelectorFactory create(
|
||||
final Supplier<? extends Row> row,
|
||||
@Nullable final Map<String, ValueType> rowSignature
|
||||
)
|
||||
{
|
||||
return new RowBasedColumnSelectorFactory(row, rowSignature);
|
||||
}
|
||||
|
||||
public static RowBasedColumnSelectorFactory create(
|
||||
final ThreadLocal<? extends Row> row,
|
||||
@Nullable final Map<String, ValueType> rowSignature
|
||||
)
|
||||
{
|
||||
return new RowBasedColumnSelectorFactory(
|
||||
new Supplier<Row>()
|
||||
{
|
||||
@Override
|
||||
public Row get()
|
||||
{
|
||||
return row.get();
|
||||
}
|
||||
},
|
||||
rowSignature
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
// This dimension selector does not have an associated lookup dictionary, which means lookup can only be done
|
||||
// on the same row. Hence it returns CARDINALITY_UNKNOWN from getValueCardinality.
|
||||
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
|
||||
}
|
||||
|
||||
private DimensionSelector makeDimensionSelectorUndecorated(DimensionSpec dimensionSpec)
|
||||
{
|
||||
final String dimension = dimensionSpec.getDimension();
|
||||
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
|
||||
|
||||
if (Column.TIME_COLUMN_NAME.equals(dimensionSpec.getDimension())) {
|
||||
if (extractionFn == null) {
|
||||
throw new UnsupportedOperationException("time dimension must provide an extraction function");
|
||||
}
|
||||
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return ZeroIndexedInts.instance();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return DimensionSelector.CARDINALITY_UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return extractionFn.apply(row.get().getTimestampFromEpoch());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
throw new UnsupportedOperationException("lookupId");
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
final List<String> dimensionValues = row.get().getDimension(dimension);
|
||||
return RangeIndexedInts.create(dimensionValues != null ? dimensionValues.size() : 0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return DimensionSelector.CARDINALITY_UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
final String value = Strings.emptyToNull(row.get().getDimension(dimension).get(id));
|
||||
return extractionFn == null ? value : extractionFn.apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
throw new UnsupportedOperationException("lookupId");
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
|
||||
{
|
||||
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return (float) row.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return row.get().getFloatMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(final String columnName)
|
||||
{
|
||||
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getLongMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(final String columnName)
|
||||
{
|
||||
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Long.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return row.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return row.get().getRaw(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumericColumnSelector makeMathExpressionSelector(String expression)
|
||||
{
|
||||
final Expr parsed = Parser.parse(expression);
|
||||
|
||||
final List<String> required = Parser.findRequiredBindings(parsed);
|
||||
final Map<String, Supplier<Number>> values = Maps.newHashMapWithExpectedSize(required.size());
|
||||
|
||||
for (final String columnName : required) {
|
||||
values.put(
|
||||
columnName, new Supplier<Number>()
|
||||
{
|
||||
@Override
|
||||
public Number get()
|
||||
{
|
||||
return Evals.toNumber(row.get().getRaw(columnName));
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
final Expr.ObjectBinding binding = Parser.withSuppliers(values);
|
||||
|
||||
return new NumericColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Number get()
|
||||
{
|
||||
return parsed.eval(binding).numericValue();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
if (Column.TIME_COLUMN_NAME.equals(columnName)) {
|
||||
// TIME_COLUMN_NAME is handled specially; override the provided rowSignature.
|
||||
return new ColumnCapabilitiesImpl().setType(ValueType.LONG);
|
||||
} else {
|
||||
final ValueType valueType = rowSignature.get(columnName);
|
||||
|
||||
// Do _not_ set isDictionaryEncoded or hasBitmapIndexes, because Row-based columns do not have those things.
|
||||
return valueType != null
|
||||
? new ColumnCapabilitiesImpl().setType(valueType)
|
||||
: new ColumnCapabilitiesImpl().setType(ValueType.STRING);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,164 +0,0 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import io.druid.common.guava.GuavaUtils;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class RowBasedValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private Row row;
|
||||
|
||||
public void setRow(Row row)
|
||||
{
|
||||
this.row = row;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final String value)
|
||||
{
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
if (value == null) {
|
||||
return new BooleanValueMatcher(false);
|
||||
}
|
||||
|
||||
final Long longValue = GuavaUtils.tryParseLong(value);
|
||||
if (longValue == null) {
|
||||
return new BooleanValueMatcher(false);
|
||||
}
|
||||
|
||||
return new ValueMatcher()
|
||||
{
|
||||
// store the primitive, so we don't unbox for every comparison
|
||||
final long unboxedLong = longValue;
|
||||
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return row.getTimestampFromEpoch() == unboxedLong;
|
||||
}
|
||||
};
|
||||
} else {
|
||||
final String valueOrNull = Strings.emptyToNull(value);
|
||||
return new ValueMatcher()
|
||||
{
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return doesMatch(row.getRaw(dimension), valueOrNull);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// There is no easy way to determine column types from a Row, so this generates all possible predicates and then
|
||||
// uses instanceof checks to determine which one to apply to each row.
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new ValueMatcher()
|
||||
{
|
||||
final DruidLongPredicate predicate = predicateFactory.makeLongPredicate();
|
||||
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return predicate.applyLong(row.getTimestampFromEpoch());
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new ValueMatcher()
|
||||
{
|
||||
final Predicate<String> stringPredicate = predicateFactory.makeStringPredicate();
|
||||
final DruidLongPredicate longPredicate = predicateFactory.makeLongPredicate();
|
||||
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
return doesMatch(row.getRaw(dimension), stringPredicate, longPredicate);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Precondition: value must be run through Strings.emptyToNull
|
||||
private boolean doesMatch(final Object raw, final String value)
|
||||
{
|
||||
if (raw == null) {
|
||||
return value == null;
|
||||
} else if (raw instanceof List) {
|
||||
final List theList = (List) raw;
|
||||
if (theList.isEmpty()) {
|
||||
// null should match empty rows in multi-value columns
|
||||
return value == null;
|
||||
} else {
|
||||
for (Object o : theList) {
|
||||
if (doesMatch(o, value)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return Objects.equals(Strings.emptyToNull(raw.toString()), value);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean doesMatch(
|
||||
final Object raw,
|
||||
final Predicate<String> stringPredicate,
|
||||
final DruidLongPredicate longPredicate
|
||||
)
|
||||
{
|
||||
if (raw == null) {
|
||||
return stringPredicate.apply(null);
|
||||
} else if (raw instanceof List) {
|
||||
final List theList = (List) raw;
|
||||
if (theList.isEmpty()) {
|
||||
return stringPredicate.apply(null);
|
||||
} else {
|
||||
for (Object o : theList) {
|
||||
if (doesMatch(o, stringPredicate, longPredicate)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
} else if (raw instanceof Long || raw instanceof Integer) {
|
||||
return longPredicate.applyLong(((Number) raw).longValue());
|
||||
} else {
|
||||
return stringPredicate.apply(Strings.emptyToNull(raw.toString()));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import com.google.common.base.Predicate;
|
|||
import com.google.common.collect.Lists;
|
||||
import io.druid.collections.BlockingPool;
|
||||
import io.druid.collections.ReferenceCountingResourceHolder;
|
||||
import io.druid.common.guava.SettableSupplier;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.java.util.common.Pair;
|
||||
|
@ -41,8 +42,9 @@ import io.druid.query.filter.Filter;
|
|||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.RowBasedValueMatcherFactory;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.RowBasedKey;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.DateTime;
|
||||
|
@ -53,6 +55,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
|
@ -61,6 +64,7 @@ public class GroupByRowProcessor
|
|||
public static Sequence<Row> process(
|
||||
final Query queryParam,
|
||||
final Sequence<Row> rows,
|
||||
final Map<String, ValueType> rowSignature,
|
||||
final GroupByQueryConfig config,
|
||||
final BlockingPool<ByteBuffer> mergeBufferPool,
|
||||
final ObjectMapper spillMapper
|
||||
|
@ -86,10 +90,15 @@ public class GroupByRowProcessor
|
|||
query,
|
||||
Filters.toFilter(query.getDimFilter())
|
||||
);
|
||||
final RowBasedValueMatcherFactory filterMatcherFactory = new RowBasedValueMatcherFactory();
|
||||
|
||||
final SettableSupplier<Row> rowSupplier = new SettableSupplier<>();
|
||||
final RowBasedColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
|
||||
rowSupplier,
|
||||
rowSignature
|
||||
);
|
||||
final ValueMatcher filterMatcher = filter == null
|
||||
? new BooleanValueMatcher(true)
|
||||
: filter.makeMatcher(filterMatcherFactory);
|
||||
: filter.makeMatcher(columnSelectorFactory);
|
||||
|
||||
final FilteredSequence<Row> filteredSequence = new FilteredSequence<>(
|
||||
rows,
|
||||
|
@ -108,7 +117,7 @@ public class GroupByRowProcessor
|
|||
if (!inInterval) {
|
||||
return false;
|
||||
}
|
||||
filterMatcherFactory.setRow(input);
|
||||
rowSupplier.set(input);
|
||||
return filterMatcher.matches();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Chars;
|
||||
|
@ -36,31 +35,19 @@ import io.druid.data.input.Row;
|
|||
import io.druid.granularity.AllGranularity;
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.java.util.common.guava.Accumulator;
|
||||
import io.druid.math.expr.Evals;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryHelper;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.query.groupby.strategy.GroupByStrategyV2;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.NumericColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
import org.joda.time.DateTime;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
@ -93,7 +80,11 @@ public class RowBasedGrouperHelper
|
|||
query.getDimensions().size(),
|
||||
querySpecificConfig.getMaxMergingDictionarySize() / (concurrencyHint == -1 ? 1 : concurrencyHint)
|
||||
);
|
||||
final RowBasedColumnSelectorFactory columnSelectorFactory = new RowBasedColumnSelectorFactory();
|
||||
final ThreadLocal<Row> columnSelectorRow = new ThreadLocal<>();
|
||||
final ColumnSelectorFactory columnSelectorFactory = RowBasedColumnSelectorFactory.create(
|
||||
columnSelectorRow,
|
||||
GroupByQueryHelper.rowSignatureFor(query)
|
||||
);
|
||||
final Grouper<RowBasedKey> grouper;
|
||||
if (concurrencyHint == -1) {
|
||||
grouper = new SpillingGrouper<>(
|
||||
|
@ -150,8 +141,7 @@ public class RowBasedGrouperHelper
|
|||
return null;
|
||||
}
|
||||
|
||||
|
||||
columnSelectorFactory.setRow(row);
|
||||
columnSelectorRow.set(row);
|
||||
|
||||
final int dimStart;
|
||||
final Comparable[] key;
|
||||
|
@ -193,7 +183,7 @@ public class RowBasedGrouperHelper
|
|||
// null return means grouping resources were exhausted.
|
||||
return null;
|
||||
}
|
||||
columnSelectorFactory.setRow(null);
|
||||
columnSelectorRow.set(null);
|
||||
|
||||
return theGrouper;
|
||||
}
|
||||
|
@ -623,197 +613,4 @@ public class RowBasedGrouperHelper
|
|||
return idx;
|
||||
}
|
||||
}
|
||||
|
||||
private static class RowBasedColumnSelectorFactory implements ColumnSelectorFactory
|
||||
{
|
||||
private ThreadLocal<Row> row = new ThreadLocal<>();
|
||||
|
||||
public void setRow(Row row)
|
||||
{
|
||||
this.row.set(row);
|
||||
}
|
||||
|
||||
// This dimension selector does not have an associated lookup dictionary, which means lookup can only be done
|
||||
// on the same row. This dimension selector is used for applying the extraction function on dimension, which
|
||||
// requires a DimensionSelector implementation
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(
|
||||
DimensionSpec dimensionSpec
|
||||
)
|
||||
{
|
||||
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
|
||||
}
|
||||
|
||||
private DimensionSelector makeDimensionSelectorUndecorated(
|
||||
DimensionSpec dimensionSpec
|
||||
)
|
||||
{
|
||||
final String dimension = dimensionSpec.getDimension();
|
||||
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
|
||||
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
final List<String> dimensionValues = row.get().getDimension(dimension);
|
||||
|
||||
final int dimensionValuesSize = dimensionValues != null ? dimensionValues.size() : 0;
|
||||
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return dimensionValuesSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
if (index < 0 || index >= dimensionValuesSize) {
|
||||
throw new IndexOutOfBoundsException("index: " + index);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntIterator iterator()
|
||||
{
|
||||
return IntIterators.fromTo(0, dimensionValuesSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill not supported");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return DimensionSelector.CARDINALITY_UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
final String value = row.get().getDimension(dimension).get(id);
|
||||
return extractionFn == null ? value : extractionFn.apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
|
||||
}
|
||||
return row.get().getDimension(dimension).indexOf(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
|
||||
{
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return row.get().getFloatMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(final String columnName)
|
||||
{
|
||||
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
}
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return row.get().getLongMetric(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(final String columnName)
|
||||
{
|
||||
return new ObjectColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return row.get().getRaw(columnName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumericColumnSelector makeMathExpressionSelector(String expression)
|
||||
{
|
||||
final Expr parsed = Parser.parse(expression);
|
||||
|
||||
final List<String> required = Parser.findRequiredBindings(parsed);
|
||||
final Map<String, Supplier<Number>> values = Maps.newHashMapWithExpectedSize(required.size());
|
||||
|
||||
for (final String columnName : required) {
|
||||
values.put(
|
||||
columnName, new Supplier<Number>()
|
||||
{
|
||||
@Override
|
||||
public Number get()
|
||||
{
|
||||
return Evals.toNumber(row.get().getRaw(columnName));
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
final Expr.ObjectBinding binding = Parser.withSuppliers(values);
|
||||
|
||||
return new NumericColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Number get()
|
||||
{
|
||||
return parsed.eval(binding).numericValue();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
// We don't have any information on the column value type, returning null defaults type to string
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import io.druid.data.input.Row;
|
|||
/**
|
||||
* A "having" spec that always evaluates to true
|
||||
*/
|
||||
public class AlwaysHavingSpec implements HavingSpec
|
||||
public class AlwaysHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x0;
|
||||
|
||||
|
|
|
@ -23,14 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The logical "and" operator for the "having" clause.
|
||||
*/
|
||||
public class AndHavingSpec implements HavingSpec
|
||||
public class AndHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x2;
|
||||
|
||||
|
@ -48,6 +50,14 @@ public class AndHavingSpec implements HavingSpec
|
|||
return havingSpecs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature)
|
||||
{
|
||||
for (HavingSpec havingSpec : havingSpecs) {
|
||||
havingSpec.setRowSignature(rowSignature);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby.having;
|
||||
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class BaseHavingSpec implements HavingSpec
|
||||
{
|
||||
@Override
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature)
|
||||
{
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
|
@ -22,20 +22,24 @@ package io.druid.query.groupby.having;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.common.guava.SettableSupplier;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.groupby.RowBasedValueMatcherFactory;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
public class DimFilterHavingSpec implements HavingSpec
|
||||
public class DimFilterHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = (byte) 0x9;
|
||||
|
||||
private final DimFilter dimFilter;
|
||||
private final RowBasedValueMatcherFactory valueMatcherFactory;
|
||||
private final ValueMatcher valueMatcher;
|
||||
private final SettableSupplier<Row> rowSupplier;
|
||||
private ValueMatcher valueMatcher;
|
||||
private int evalCount;
|
||||
|
||||
@JsonCreator
|
||||
public DimFilterHavingSpec(
|
||||
|
@ -43,8 +47,7 @@ public class DimFilterHavingSpec implements HavingSpec
|
|||
)
|
||||
{
|
||||
this.dimFilter = Preconditions.checkNotNull(dimFilter, "filter");
|
||||
this.valueMatcherFactory = new RowBasedValueMatcherFactory();
|
||||
this.valueMatcher = dimFilter.toFilter().makeMatcher(valueMatcherFactory);
|
||||
this.rowSupplier = new SettableSupplier<>();
|
||||
}
|
||||
|
||||
@JsonProperty("filter")
|
||||
|
@ -53,17 +56,25 @@ public class DimFilterHavingSpec implements HavingSpec
|
|||
return dimFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature)
|
||||
{
|
||||
this.valueMatcher = dimFilter.toFilter()
|
||||
.makeMatcher(RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(final Row row)
|
||||
{
|
||||
// Not thread safe, but it doesn't have to be.
|
||||
valueMatcherFactory.setRow(row);
|
||||
try {
|
||||
return valueMatcher.matches();
|
||||
}
|
||||
finally {
|
||||
valueMatcherFactory.setRow(null);
|
||||
int oldEvalCount = evalCount;
|
||||
evalCount++;
|
||||
rowSupplier.set(row);
|
||||
final boolean retVal = valueMatcher.matches();
|
||||
if (evalCount != oldEvalCount + 1) {
|
||||
// Oops, someone was using this from two different threads, bad caller.
|
||||
throw new IllegalStateException("concurrent 'eval' calls not permitted!");
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -31,7 +31,7 @@ import io.druid.query.extraction.IdentityExtractionFn;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
public class DimensionSelectorHavingSpec implements HavingSpec
|
||||
public class DimensionSelectorHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = (byte) 0x8;
|
||||
private static final byte STRING_SEPARATOR = (byte) 0xFF;
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.util.Arrays;
|
|||
* The "=" operator in a "having" clause. This is similar to SQL's "having aggregation = value",
|
||||
* except that in SQL an aggregation is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class EqualToHavingSpec implements HavingSpec
|
||||
public class EqualToHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x3;
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ import java.util.Arrays;
|
|||
* The ">" operator in a "having" clause. This is similar to SQL's "having aggregation > value",
|
||||
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class GreaterThanHavingSpec implements HavingSpec
|
||||
public class GreaterThanHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x4;
|
||||
|
||||
|
|
|
@ -22,10 +22,14 @@ package io.druid.query.groupby.having;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A "having" clause that filters aggregated/dimension value. This is similar to SQL's "having"
|
||||
* clause.
|
||||
* clause. HavingSpec objects are *not* thread-safe and must not be used simultaneously by multiple
|
||||
* threads.
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
|
@ -46,6 +50,14 @@ public interface HavingSpec
|
|||
public static final HavingSpec NEVER = new NeverHavingSpec();
|
||||
public static final HavingSpec ALWAYS = new AlwaysHavingSpec();
|
||||
|
||||
/**
|
||||
* Informs this HavingSpec that rows passed to "eval" will have a certain signature. Will be called
|
||||
* before "eval".
|
||||
*
|
||||
* @param rowSignature signature of the rows
|
||||
*/
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature);
|
||||
|
||||
/**
|
||||
* Evaluates if a given row satisfies the having spec.
|
||||
*
|
||||
|
|
|
@ -31,7 +31,7 @@ import java.util.Arrays;
|
|||
* The "<" operator in a "having" clause. This is similar to SQL's "having aggregation < value",
|
||||
* except that an aggregation in SQL is an expression instead of an aggregation name as in Druid.
|
||||
*/
|
||||
public class LessThanHavingSpec implements HavingSpec
|
||||
public class LessThanHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x5;
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import io.druid.data.input.Row;
|
|||
/**
|
||||
* A "having" spec that always evaluates to false
|
||||
*/
|
||||
public class NeverHavingSpec implements HavingSpec
|
||||
public class NeverHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x1;
|
||||
|
||||
|
|
|
@ -22,13 +22,15 @@ package io.druid.query.groupby.having;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The logical "not" operator for the "having" clause.
|
||||
*/
|
||||
public class NotHavingSpec implements HavingSpec
|
||||
public class NotHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x6;
|
||||
|
||||
|
@ -46,6 +48,12 @@ public class NotHavingSpec implements HavingSpec
|
|||
return havingSpec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature)
|
||||
{
|
||||
havingSpec.setRowSignature(rowSignature);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
|
|
|
@ -23,14 +23,16 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The logical "or" operator for the "having" clause.
|
||||
*/
|
||||
public class OrHavingSpec implements HavingSpec
|
||||
public class OrHavingSpec extends BaseHavingSpec
|
||||
{
|
||||
private static final byte CACHE_KEY = 0x7;
|
||||
|
||||
|
@ -48,6 +50,14 @@ public class OrHavingSpec implements HavingSpec
|
|||
return havingSpecs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRowSignature(Map<String, ValueType> rowSignature)
|
||||
{
|
||||
for (HavingSpec havingSpec : havingSpecs) {
|
||||
havingSpec.setRowSignature(rowSignature);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
{
|
||||
|
|
|
@ -48,6 +48,7 @@ import io.druid.query.ResultMergeQueryRunner;
|
|||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryHelper;
|
||||
import io.druid.query.groupby.epinephelinae.GroupByBinaryFnV2;
|
||||
import io.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2;
|
||||
import io.druid.query.groupby.epinephelinae.GroupByQueryEngineV2;
|
||||
|
@ -207,6 +208,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
|
|||
final Sequence<Row> results = GroupByRowProcessor.process(
|
||||
query,
|
||||
subqueryResult,
|
||||
GroupByQueryHelper.rowSignatureFor(subquery),
|
||||
configSupplier.get(),
|
||||
mergeBufferPool,
|
||||
spillMapper
|
||||
|
|
|
@ -20,9 +20,8 @@
|
|||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.topn.types.TopNStrategyFactory;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
|
@ -47,19 +46,19 @@ public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
|
|||
@SuppressWarnings("unchecked")
|
||||
public Result<TopNResultValue> apply(Cursor cursor)
|
||||
{
|
||||
final ColumnSelectorPlus[] selectorPlusArray = DimensionHandlerUtils.createColumnSelectorPluses(
|
||||
final ColumnSelectorPlus selectorPlus = DimensionHandlerUtils.createColumnSelectorPlus(
|
||||
STRATEGY_FACTORY,
|
||||
Lists.newArrayList(query.getDimensionSpec()),
|
||||
query.getDimensionSpec(),
|
||||
cursor
|
||||
);
|
||||
|
||||
if (selectorPlusArray[0].getSelector() == null) {
|
||||
if (selectorPlus.getSelector() == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
TopNParams params = null;
|
||||
try {
|
||||
params = topNAlgorithm.makeInitParams(selectorPlusArray[0], cursor);
|
||||
params = topNAlgorithm.makeInitParams(selectorPlus, cursor);
|
||||
|
||||
TopNResultBuilder resultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, query);
|
||||
|
||||
|
|
|
@ -19,12 +19,13 @@
|
|||
|
||||
package io.druid.segment;
|
||||
|
||||
import io.druid.java.util.common.IAE;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.dimension.ColumnSelectorStrategy;
|
||||
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ValueType;
|
||||
|
@ -63,6 +64,27 @@ public final class DimensionHandlerUtils
|
|||
return new StringDimensionHandler(dimensionName, multiValueHandling);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience function equivalent to calling
|
||||
* {@link #createColumnSelectorPluses(ColumnSelectorStrategyFactory, List, ColumnSelectorFactory)} with a singleton
|
||||
* list of dimensionSpecs and then retrieving the only element in the returned array.
|
||||
*
|
||||
* @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
|
||||
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
|
||||
* @param dimensionSpec column to generate a ColumnSelectorPlus object for
|
||||
* @param cursor Used to create value selectors for columns.
|
||||
*
|
||||
* @return A ColumnSelectorPlus object
|
||||
*/
|
||||
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorPlus<ColumnSelectorStrategyClass> createColumnSelectorPlus(
|
||||
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
|
||||
DimensionSpec dimensionSpec,
|
||||
ColumnSelectorFactory cursor
|
||||
)
|
||||
{
|
||||
return createColumnSelectorPluses(strategyFactory, ImmutableList.of(dimensionSpec), cursor)[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an array of ColumnSelectorPlus objects, selectors that handle type-specific operations within
|
||||
* query processing engines, using a strategy factory provided by the query engine. One ColumnSelectorPlus
|
||||
|
|
|
@ -22,8 +22,6 @@ package io.druid.segment;
|
|||
import io.druid.collections.bitmap.BitmapFactory;
|
||||
import io.druid.collections.bitmap.MutableBitmap;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
|
@ -210,7 +208,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
|
|||
* @param desc Descriptor object for this dimension within an IncrementalIndex
|
||||
* @return A new object that reads rows from currEntry
|
||||
*/
|
||||
Object makeColumnValueSelector(
|
||||
ColumnValueSelector makeColumnValueSelector(
|
||||
DimensionSpec spec,
|
||||
IncrementalIndexStorageAdapter.EntryHolder currEntry,
|
||||
IncrementalIndex.DimensionDesc desc
|
||||
|
@ -308,46 +306,4 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
|
|||
* @param factory bitmap factory
|
||||
*/
|
||||
void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory);
|
||||
|
||||
|
||||
/**
|
||||
* Return a ValueMatcher that accepts an EntryHolder containing the current TimeAndDims key and the array index of this
|
||||
* indexer's dimension within the TimeAndDims key.
|
||||
*
|
||||
* The implementer should read the dimension array Object from the TimeAndDims key and cast it to the appropriate
|
||||
* type, as described in the documentation for compareUnsortedEncodedArrays().
|
||||
*
|
||||
* The returned ValueMatcher should match the dimension values against matchValue.
|
||||
*
|
||||
* See StringDimensionIndexer for a reference implementation.
|
||||
*
|
||||
* @param matchValue value to match on
|
||||
* @param holder holds the current TimeAndDims key during row iteration
|
||||
* @param dimIndex the array index of this indexer's dimension within the TimeAndDims key
|
||||
* @return A ValueMatcher that matches a dimension value array from a TimeAndDims key against "matchValue"
|
||||
*/
|
||||
ValueMatcher makeIndexingValueMatcher(String matchValue, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex);
|
||||
|
||||
/**
|
||||
* Return a ValueMatcher that accepts an EntryHolder containing the current TimeAndDims key and the array index of this
|
||||
* indexer's dimension within the TimeAndDims key.
|
||||
*
|
||||
* The implementer should read the dimension array Object from the TimeAndDims key and cast it to the appropriate
|
||||
* type, as described in the documentation for compareUnsortedEncodedArrays().
|
||||
*
|
||||
* Based on the type of the indexer, this method should get a predicate of the same type from the supplied
|
||||
* predicateFactory.
|
||||
*
|
||||
* For example, a StringDimensionIndexer would call predicateFactory.makeStringPredicate().
|
||||
*
|
||||
* The returned ValueMatcher should apply the generated predicate to the dimension values.
|
||||
*
|
||||
* See StringDimensionIndexer for a reference implementation.
|
||||
*
|
||||
* @param predicateFactory Factory object that can generate predicates for each supported dimension type
|
||||
* @param holder holds the current TimeAndDims key during row iteration
|
||||
* @param dimIndex the array index of this indexer's dimension within the TimeAndDims key
|
||||
* @return A ValueMatcher that applies a predicate from the predicateFactory to the dimension values in the TimeAndDim keys
|
||||
*/
|
||||
ValueMatcher makeIndexingValueMatcher(DruidPredicateFactory predicateFactory, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex);
|
||||
}
|
||||
|
|
|
@ -21,47 +21,14 @@ package io.druid.segment;
|
|||
|
||||
import com.google.common.base.Strings;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
|
||||
import java.io.IOException;
|
||||
import io.druid.segment.data.ZeroIndexedInts;
|
||||
|
||||
public class NullDimensionSelector implements DimensionSelector
|
||||
{
|
||||
|
||||
private static final IndexedInts SINGLETON = new IndexedInts() {
|
||||
@Override
|
||||
public int size() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntIterator iterator() {
|
||||
return IntIterators.singleton(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("NullDimensionSelector does not support fill");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
return SINGLETON;
|
||||
return ZeroIndexedInts.instance();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,27 +27,19 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closer;
|
||||
|
||||
import io.druid.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.BooleanFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
|
||||
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.column.Column;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
|
@ -61,7 +53,6 @@ import io.druid.segment.data.IndexedInts;
|
|||
import io.druid.segment.data.Offset;
|
||||
import io.druid.segment.filter.AndFilter;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -927,10 +918,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
} else {
|
||||
return new QueryableIndexBaseCursor()
|
||||
{
|
||||
CursorOffsetHolderValueMatcherFactory valueMatcherFactory = new CursorOffsetHolderValueMatcherFactory(
|
||||
storageAdapter,
|
||||
this
|
||||
);
|
||||
RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory(
|
||||
cursorOffsetHolder,
|
||||
descending
|
||||
|
@ -942,7 +929,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
if (postFilter instanceof BooleanFilter) {
|
||||
filterMatcher = ((BooleanFilter) postFilter).makeMatcher(
|
||||
bitmapIndexSelector,
|
||||
valueMatcherFactory,
|
||||
this,
|
||||
rowOffsetMatcherFactory
|
||||
);
|
||||
} else {
|
||||
|
@ -950,7 +937,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
filterMatcher = rowOffsetMatcherFactory.makeRowOffsetMatcher(postFilter.getBitmapIndex(
|
||||
bitmapIndexSelector));
|
||||
} else {
|
||||
filterMatcher = postFilter.makeMatcher(valueMatcherFactory);
|
||||
filterMatcher = postFilter.makeMatcher(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1040,83 +1027,6 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
}
|
||||
|
||||
private static class CursorOffsetHolderValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY =
|
||||
new ValueMatcherColumnSelectorStrategyFactory();
|
||||
|
||||
private final StorageAdapter storageAdapter;
|
||||
private final ColumnSelectorFactory cursor;
|
||||
private final List<String> availableMetrics;
|
||||
|
||||
public CursorOffsetHolderValueMatcherFactory(
|
||||
StorageAdapter storageAdapter,
|
||||
ColumnSelectorFactory cursor
|
||||
)
|
||||
{
|
||||
this.storageAdapter = storageAdapter;
|
||||
this.cursor = cursor;
|
||||
this.availableMetrics = Lists.newArrayList(storageAdapter.getAvailableMetrics());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final String value)
|
||||
{
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) {
|
||||
if (getTypeForDimension(dimension) == ValueType.LONG) {
|
||||
return Filters.getLongValueMatcher(
|
||||
cursor.makeLongColumnSelector(dimension),
|
||||
value
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
|
||||
DimensionHandlerUtils.createColumnSelectorPluses(
|
||||
STRATEGY_FACTORY,
|
||||
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
|
||||
cursor
|
||||
);
|
||||
|
||||
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
|
||||
return strategy.getValueMatcher(dimension, cursor, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) {
|
||||
if (getTypeForDimension(dimension) == ValueType.LONG) {
|
||||
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
|
||||
}
|
||||
}
|
||||
|
||||
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
|
||||
DimensionHandlerUtils.createColumnSelectorPluses(
|
||||
STRATEGY_FACTORY,
|
||||
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
|
||||
cursor
|
||||
);
|
||||
|
||||
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
|
||||
return strategy.getValueMatcher(dimension, cursor, predicateFactory);
|
||||
}
|
||||
|
||||
private ValueMatcher makeLongValueMatcher(String dimension, final DruidLongPredicate predicate)
|
||||
{
|
||||
return Filters.getLongPredicateMatcher(
|
||||
cursor.makeLongColumnSelector(dimension),
|
||||
predicate
|
||||
);
|
||||
}
|
||||
|
||||
private ValueType getTypeForDimension(String dimension)
|
||||
{
|
||||
ColumnCapabilities capabilities = cursor.getColumnCapabilities(dimension);
|
||||
return capabilities == null ? ValueType.STRING : capabilities.getType();
|
||||
}
|
||||
}
|
||||
|
||||
private static class CursorOffsetHolderRowOffsetMatcherFactory implements RowOffsetMatcherFactory
|
||||
{
|
||||
private final CursorOffsetHolder holder;
|
||||
|
|
|
@ -20,23 +20,18 @@
|
|||
package io.druid.segment;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.collections.bitmap.BitmapFactory;
|
||||
import io.druid.collections.bitmap.MutableBitmap;
|
||||
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.IndexedIterable;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import it.unimi.dsi.fastutil.ints.IntArrayList;
|
||||
|
@ -379,7 +374,7 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
}
|
||||
|
||||
@Override
|
||||
public Object makeColumnValueSelector(
|
||||
public DimensionSelector makeColumnValueSelector(
|
||||
final DimensionSpec spec,
|
||||
final IncrementalIndexStorageAdapter.EntryHolder currEntry,
|
||||
final IncrementalIndex.DimensionDesc desc
|
||||
|
@ -404,13 +399,19 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
indices = null;
|
||||
}
|
||||
|
||||
int nullId = getEncodedValue(null, false);
|
||||
IntList valsTmp = null;
|
||||
if ((indices == null || indices.length == 0) && nullId > -1) {
|
||||
if (nullId < maxId) {
|
||||
valsTmp = IntLists.singleton(nullId);
|
||||
if (indices == null || indices.length == 0) {
|
||||
final int nullId = getEncodedValue(null, false);
|
||||
if (nullId > -1) {
|
||||
if (nullId < maxId) {
|
||||
valsTmp = IntLists.singleton(nullId);
|
||||
} else {
|
||||
valsTmp = IntLists.EMPTY_LIST;
|
||||
}
|
||||
}
|
||||
} else if (indices != null && indices.length > 0) {
|
||||
}
|
||||
|
||||
if (valsTmp == null && indices != null && indices.length > 0) {
|
||||
valsTmp = new IntArrayList(indices.length);
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
int id = indices[i];
|
||||
|
@ -534,79 +535,6 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeIndexingValueMatcher(
|
||||
final String matchValue,
|
||||
final IncrementalIndexStorageAdapter.EntryHolder holder,
|
||||
final int dimIndex
|
||||
)
|
||||
{
|
||||
final int encodedVal = getEncodedValue(matchValue, false);
|
||||
final boolean matchOnNull = Strings.isNullOrEmpty(matchValue);
|
||||
if (encodedVal < 0 && !matchOnNull) {
|
||||
return new BooleanValueMatcher(false);
|
||||
}
|
||||
|
||||
return new ValueMatcher()
|
||||
{
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
Object[] dims = holder.getKey().getDims();
|
||||
if (dimIndex >= dims.length) {
|
||||
return matchOnNull;
|
||||
}
|
||||
|
||||
int[] dimsInt = (int[]) dims[dimIndex];
|
||||
if (dimsInt == null || dimsInt.length == 0) {
|
||||
return matchOnNull;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dimsInt.length; i++) {
|
||||
if (dimsInt[i] == encodedVal) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeIndexingValueMatcher(
|
||||
final DruidPredicateFactory predicateFactory,
|
||||
final IncrementalIndexStorageAdapter.EntryHolder holder,
|
||||
final int dimIndex
|
||||
)
|
||||
{
|
||||
final Predicate<String> predicate = predicateFactory.makeStringPredicate();
|
||||
final boolean matchOnNull = predicate.apply(null);
|
||||
return new ValueMatcher()
|
||||
{
|
||||
@Override
|
||||
public boolean matches()
|
||||
{
|
||||
Object[] dims = holder.getKey().getDims();
|
||||
if (dimIndex >= dims.length) {
|
||||
return matchOnNull;
|
||||
}
|
||||
|
||||
int[] dimsInt = (int[]) dims[dimIndex];
|
||||
if (dimsInt == null || dimsInt.length == 0) {
|
||||
return matchOnNull;
|
||||
}
|
||||
|
||||
for (int i = 0; i < dimsInt.length; i++) {
|
||||
String finalDimVal = getActualValue(dimsInt[i], false);
|
||||
if (predicate.apply(finalDimVal)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SortedDimensionDictionary sortedLookup()
|
||||
{
|
||||
return sortedLookup == null ? sortedLookup = dimLookup.sort() : sortedLookup;
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An IndexedInts that always returns [0, 1, ..., N].
|
||||
*/
|
||||
public class RangeIndexedInts implements IndexedInts
|
||||
{
|
||||
private static final int CACHE_LIMIT = 8;
|
||||
private static final RangeIndexedInts[] CACHE = new RangeIndexedInts[CACHE_LIMIT];
|
||||
|
||||
static {
|
||||
for (int i = 0; i < CACHE_LIMIT; i++) {
|
||||
CACHE[i] = new RangeIndexedInts(i);
|
||||
}
|
||||
}
|
||||
|
||||
private final int size;
|
||||
|
||||
private RangeIndexedInts(int size)
|
||||
{
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public static RangeIndexedInts create(final int size)
|
||||
{
|
||||
Preconditions.checkArgument(size >= 0, "size >= 0");
|
||||
if (size < CACHE_LIMIT) {
|
||||
return CACHE[size];
|
||||
} else {
|
||||
return new RangeIndexedInts(size);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
if (index < 0 || index >= size) {
|
||||
throw new IndexOutOfBoundsException("index: " + index);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill");
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntIterator iterator()
|
||||
{
|
||||
return IntIterators.fromTo(0, size);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* An IndexedInts that always returns a row containing a single zero.
|
||||
*/
|
||||
public class ZeroIndexedInts implements IndexedInts
|
||||
{
|
||||
private final static ZeroIndexedInts INSTANCE = new ZeroIndexedInts();
|
||||
|
||||
private ZeroIndexedInts()
|
||||
{
|
||||
// Singleton.
|
||||
}
|
||||
|
||||
public static ZeroIndexedInts instance()
|
||||
{
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
// Skip range check in production, assume "index" was 0 like it really should have been.
|
||||
assert index == 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill");
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntIterator iterator()
|
||||
{
|
||||
return IntIterators.singleton(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -27,7 +27,7 @@ import io.druid.query.filter.BooleanFilter;
|
|||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -73,7 +73,7 @@ public class AndFilter implements BooleanFilter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
if (filters.size() == 0) {
|
||||
return new BooleanValueMatcher(false);
|
||||
|
@ -90,7 +90,7 @@ public class AndFilter implements BooleanFilter
|
|||
@Override
|
||||
public ValueMatcher makeMatcher(
|
||||
BitmapIndexSelector selector,
|
||||
ValueMatcherFactory valueMatcherFactory,
|
||||
ColumnSelectorFactory columnSelectorFactory,
|
||||
RowOffsetMatcherFactory rowOffsetMatcherFactory
|
||||
)
|
||||
{
|
||||
|
@ -101,7 +101,7 @@ public class AndFilter implements BooleanFilter
|
|||
if (filter.supportsBitmapIndex(selector)) {
|
||||
bitmaps.add(filter.getBitmapIndex(selector));
|
||||
} else {
|
||||
ValueMatcher matcher = filter.makeMatcher(valueMatcherFactory);
|
||||
ValueMatcher matcher = filter.makeMatcher(columnSelectorFactory);
|
||||
matchers.add(matcher);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,8 +29,8 @@ import io.druid.query.filter.DruidLongPredicate;
|
|||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.query.ordering.StringComparators;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
@ -132,9 +132,9 @@ public class BoundFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(boundDimFilter.getDimension(), getPredicateFactory());
|
||||
return Filters.makeValueMatcher(factory, boundDimFilter.getDimension(), getPredicateFactory());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,7 +28,7 @@ import io.druid.query.filter.DruidLongPredicate;
|
|||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -93,9 +93,9 @@ public class DimensionPredicateFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(dimension, predicateFactory);
|
||||
return Filters.makeValueMatcher(factory, dimension, predicateFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -28,15 +28,23 @@ import com.google.common.collect.Lists;
|
|||
import io.druid.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.common.guava.GuavaUtils;
|
||||
import io.druid.java.util.common.guava.FunctionalIterable;
|
||||
import io.druid.query.ColumnSelectorPlus;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.BooleanFilter;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
|
||||
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
import io.druid.segment.LongColumnSelector;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.Indexed;
|
||||
|
||||
|
@ -89,6 +97,84 @@ public class Filters
|
|||
return dimFilter == null ? null : dimFilter.toFilter();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a ValueMatcher that compares row values to the provided string.
|
||||
*
|
||||
* An implementation of this method should be able to handle dimensions of various types.
|
||||
*
|
||||
* @param columnSelectorFactory Selector for columns.
|
||||
* @param columnName The column to filter.
|
||||
* @param value The value to match against, represented as a String.
|
||||
*
|
||||
* @return An object that matches row values on the provided value.
|
||||
*/
|
||||
public static ValueMatcher makeValueMatcher(
|
||||
final ColumnSelectorFactory columnSelectorFactory,
|
||||
final String columnName,
|
||||
final String value
|
||||
)
|
||||
{
|
||||
final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(columnName);
|
||||
|
||||
// This should be folded into the ValueMatcherColumnSelectorStrategy once that can handle LONG typed columns.
|
||||
if (capabilities != null && capabilities.getType() == ValueType.LONG) {
|
||||
return getLongValueMatcher(
|
||||
columnSelectorFactory.makeLongColumnSelector(columnName),
|
||||
value
|
||||
);
|
||||
}
|
||||
|
||||
final ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy> selector =
|
||||
DimensionHandlerUtils.createColumnSelectorPlus(
|
||||
ValueMatcherColumnSelectorStrategyFactory.instance(),
|
||||
DefaultDimensionSpec.of(columnName),
|
||||
columnSelectorFactory
|
||||
);
|
||||
|
||||
return selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(), value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a ValueMatcher that applies a predicate to row values.
|
||||
*
|
||||
* The caller provides a predicate factory that can create a predicate for each value type supported by Druid.
|
||||
* See {@link DruidPredicateFactory} for more information.
|
||||
*
|
||||
* When creating the ValueMatcher, the ValueMatcherFactory implementation should decide what type of predicate
|
||||
* to create from the predicate factory based on the ValueType of the specified dimension.
|
||||
*
|
||||
* @param columnSelectorFactory Selector for columns.
|
||||
* @param columnName The column to filter.
|
||||
* @param predicateFactory Predicate factory
|
||||
*
|
||||
* @return An object that applies a predicate to row values
|
||||
*/
|
||||
public static ValueMatcher makeValueMatcher(
|
||||
final ColumnSelectorFactory columnSelectorFactory,
|
||||
final String columnName,
|
||||
final DruidPredicateFactory predicateFactory
|
||||
)
|
||||
{
|
||||
final ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(columnName);
|
||||
|
||||
// This should be folded into the ValueMatcherColumnSelectorStrategy once that can handle LONG typed columns.
|
||||
if (capabilities != null && capabilities.getType() == ValueType.LONG) {
|
||||
return getLongPredicateMatcher(
|
||||
columnSelectorFactory.makeLongColumnSelector(columnName),
|
||||
predicateFactory.makeLongPredicate()
|
||||
);
|
||||
}
|
||||
|
||||
final ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy> selector =
|
||||
DimensionHandlerUtils.createColumnSelectorPlus(
|
||||
ValueMatcherColumnSelectorStrategyFactory.instance(),
|
||||
DefaultDimensionSpec.of(columnName),
|
||||
columnSelectorFactory
|
||||
);
|
||||
|
||||
return selector.getColumnSelectorStrategy().makeValueMatcher(selector.getSelector(), predicateFactory);
|
||||
}
|
||||
|
||||
public static ImmutableBitmap allFalse(final BitmapIndexSelector selector)
|
||||
{
|
||||
return selector.getBitmapFactory().makeEmptyImmutableBitmap();
|
||||
|
|
|
@ -31,7 +31,7 @@ import io.druid.query.filter.DruidLongPredicate;
|
|||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -83,9 +83,9 @@ public class InFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(dimension, getPredicateFactory());
|
||||
return Filters.makeValueMatcher(factory, dimension, getPredicateFactory());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -25,7 +25,7 @@ import io.druid.query.filter.BitmapIndexSelector;
|
|||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.JavaScriptDimFilter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import org.mozilla.javascript.Context;
|
||||
|
||||
public class JavaScriptFilter implements Filter
|
||||
|
@ -64,10 +64,10 @@ public class JavaScriptFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
// suboptimal, since we need create one context per call to predicate.apply()
|
||||
return factory.makeValueMatcher(dimension, predicateFactory);
|
||||
return Filters.makeValueMatcher(factory, dimension, predicateFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -20,14 +20,13 @@
|
|||
package io.druid.segment.filter;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import io.druid.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.LikeDimFilter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.column.BitmapIndex;
|
||||
import io.druid.segment.data.Indexed;
|
||||
|
||||
|
@ -131,9 +130,9 @@ public class LikeFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(dimension, likeMatcher.predicateFactory(extractionFn));
|
||||
return Filters.makeValueMatcher(factory, dimension, likeMatcher.predicateFactory(extractionFn));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,7 +23,7 @@ import io.druid.collections.bitmap.ImmutableBitmap;
|
|||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -48,7 +48,7 @@ public class NotFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
final ValueMatcher baseMatcher = baseFilter.makeMatcher(factory);
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import io.druid.query.filter.BooleanFilter;
|
|||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.RowOffsetMatcherFactory;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -67,7 +67,7 @@ public class OrFilter implements BooleanFilter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
final ValueMatcher[] matchers = new ValueMatcher[filters.size()];
|
||||
|
||||
|
@ -80,7 +80,7 @@ public class OrFilter implements BooleanFilter
|
|||
@Override
|
||||
public ValueMatcher makeMatcher(
|
||||
BitmapIndexSelector selector,
|
||||
ValueMatcherFactory valueMatcherFactory,
|
||||
ColumnSelectorFactory columnSelectorFactory,
|
||||
RowOffsetMatcherFactory rowOffsetMatcherFactory
|
||||
)
|
||||
{
|
||||
|
@ -91,7 +91,7 @@ public class OrFilter implements BooleanFilter
|
|||
if (filter.supportsBitmapIndex(selector)) {
|
||||
bitmaps.add(filter.getBitmapIndex(selector));
|
||||
} else {
|
||||
ValueMatcher matcher = filter.makeMatcher(valueMatcherFactory);
|
||||
ValueMatcher matcher = filter.makeMatcher(columnSelectorFactory);
|
||||
matchers.add(matcher);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ import io.druid.collections.bitmap.ImmutableBitmap;
|
|||
import io.druid.query.filter.BitmapIndexSelector;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -48,9 +48,9 @@ public class SelectorFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(dimension, value);
|
||||
return Filters.makeValueMatcher(factory, dimension, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,7 +27,7 @@ import io.druid.query.filter.DruidLongPredicate;
|
|||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.incremental.SpatialDimensionRowTransformer;
|
||||
|
||||
/**
|
||||
|
@ -54,9 +54,10 @@ public class SpatialFilter implements Filter
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return factory.makeValueMatcher(
|
||||
return Filters.makeValueMatcher(
|
||||
factory,
|
||||
dimension,
|
||||
new DruidPredicateFactory()
|
||||
{
|
||||
|
|
|
@ -38,15 +38,12 @@ import io.druid.data.input.impl.DimensionSchema;
|
|||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.SpatialDimensionSchema;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.math.expr.Evals;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionHandler;
|
||||
import io.druid.segment.DimensionHandlerUtils;
|
||||
|
@ -61,19 +58,15 @@ import io.druid.segment.column.Column;
|
|||
import io.druid.segment.column.ColumnCapabilities;
|
||||
import io.druid.segment.column.ColumnCapabilitiesImpl;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.serde.ComplexMetricExtractor;
|
||||
import io.druid.segment.serde.ComplexMetricSerde;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterator;
|
||||
import it.unimi.dsi.fastutil.ints.IntIterators;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.annotation.concurrent.GuardedBy;
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Array;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
|
@ -111,46 +104,22 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
public static ColumnSelectorFactory makeColumnSelectorFactory(
|
||||
final AggregatorFactory agg,
|
||||
final Supplier<InputRow> in,
|
||||
final boolean deserializeComplexMetrics,
|
||||
final Map<String, ColumnCapabilitiesImpl> columnCapabilities
|
||||
final boolean deserializeComplexMetrics
|
||||
)
|
||||
{
|
||||
final RowBasedColumnSelectorFactory baseSelectorFactory = RowBasedColumnSelectorFactory.create(in, null);
|
||||
return new ColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public LongColumnSelector makeLongColumnSelector(final String columnName)
|
||||
{
|
||||
if (columnName.equals(Column.TIME_COLUMN_NAME)) {
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return in.get().getTimestampFromEpoch();
|
||||
}
|
||||
};
|
||||
}
|
||||
return new LongColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public long get()
|
||||
{
|
||||
return in.get().getLongMetric(columnName);
|
||||
}
|
||||
};
|
||||
return baseSelectorFactory.makeLongColumnSelector(columnName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(final String columnName)
|
||||
{
|
||||
return new FloatColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public float get()
|
||||
{
|
||||
return in.get().getFloatMetric(columnName);
|
||||
}
|
||||
};
|
||||
return baseSelectorFactory.makeFloatColumnSelector(columnName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -158,20 +127,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
{
|
||||
final String typeName = agg.getTypeName();
|
||||
|
||||
final ObjectColumnSelector<Object> rawColumnSelector = new ObjectColumnSelector<Object>()
|
||||
{
|
||||
@Override
|
||||
public Class classOfObject()
|
||||
{
|
||||
return Object.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return in.get().getRaw(column);
|
||||
}
|
||||
};
|
||||
final ObjectColumnSelector rawColumnSelector = baseSelectorFactory.makeObjectColumnSelector(column);
|
||||
|
||||
if ((Enums.getIfPresent(ValueType.class, typeName.toUpperCase()).isPresent() && !typeName.equalsIgnoreCase(ValueType.COMPLEX.name()))
|
||||
|| !deserializeComplexMetrics) {
|
||||
|
@ -201,129 +157,21 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
|
|||
}
|
||||
|
||||
@Override
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec)
|
||||
{
|
||||
// This ColumnSelectorFactory implementation has no knowledge of column capabilities.
|
||||
// However, this method may still be called by FilteredAggregatorFactory's ValueMatcherFactory
|
||||
// to check column types.
|
||||
// If column capabilities are not available, return null, the caller will assume default types in that case.
|
||||
return columnCapabilities == null ? null : columnCapabilities.get(columnName);
|
||||
return baseSelectorFactory.makeDimensionSelector(dimensionSpec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(
|
||||
DimensionSpec dimensionSpec
|
||||
)
|
||||
public ColumnCapabilities getColumnCapabilities(String columnName)
|
||||
{
|
||||
return dimensionSpec.decorate(makeDimensionSelectorUndecorated(dimensionSpec));
|
||||
}
|
||||
|
||||
private DimensionSelector makeDimensionSelectorUndecorated(
|
||||
DimensionSpec dimensionSpec
|
||||
)
|
||||
{
|
||||
final String dimension = dimensionSpec.getDimension();
|
||||
final ExtractionFn extractionFn = dimensionSpec.getExtractionFn();
|
||||
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
final List<String> dimensionValues = in.get().getDimension(dimension);
|
||||
final int dimensionValuesSize = dimensionValues != null ? dimensionValues.size() : 0;
|
||||
|
||||
return new IndexedInts()
|
||||
{
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return dimensionValuesSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
if (index < 0 || index >= dimensionValuesSize) {
|
||||
throw new IndexOutOfBoundsException("index: " + index);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IntIterator iterator()
|
||||
{
|
||||
return IntIterators.fromTo(0, dimensionValuesSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill not supported");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return DimensionSelector.CARDINALITY_UNKNOWN;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
final String value = in.get().getDimension(dimension).get(id);
|
||||
return extractionFn == null ? value : extractionFn.apply(value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
if (extractionFn != null) {
|
||||
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
|
||||
}
|
||||
return in.get().getDimension(dimension).indexOf(name);
|
||||
}
|
||||
};
|
||||
return baseSelectorFactory.getColumnCapabilities(columnName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NumericColumnSelector makeMathExpressionSelector(String expression)
|
||||
{
|
||||
final Expr parsed = Parser.parse(expression);
|
||||
|
||||
final List<String> required = Parser.findRequiredBindings(parsed);
|
||||
final Map<String, Supplier<Number>> values = Maps.newHashMapWithExpectedSize(required.size());
|
||||
|
||||
for (final String columnName : required) {
|
||||
values.put(
|
||||
columnName, new Supplier<Number>()
|
||||
{
|
||||
@Override
|
||||
public Number get()
|
||||
{
|
||||
return Evals.toNumber(in.get().getRaw(columnName));
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
final Expr.ObjectBinding binding = Parser.withSuppliers(values);
|
||||
|
||||
return new NumericColumnSelector()
|
||||
{
|
||||
@Override
|
||||
public Number get()
|
||||
{
|
||||
return parsed.eval(binding).numericValue();
|
||||
}
|
||||
};
|
||||
return baseSelectorFactory.makeMathExpressionSelector(expression);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -20,27 +20,22 @@
|
|||
package io.druid.segment.incremental;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterators;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.math.expr.Expr;
|
||||
import io.druid.math.expr.Parser;
|
||||
import io.druid.query.QueryInterruptedException;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
import io.druid.query.filter.DruidLongPredicate;
|
||||
import io.druid.query.filter.DruidPredicateFactory;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.segment.Capabilities;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionHandler;
|
||||
|
@ -63,11 +58,9 @@ import io.druid.segment.column.ValueType;
|
|||
import io.druid.segment.data.Indexed;
|
||||
import io.druid.segment.data.ListIndexed;
|
||||
import io.druid.segment.filter.BooleanValueMatcher;
|
||||
import io.druid.segment.filter.Filters;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -245,13 +238,13 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
EntryHolder currEntry = new EntryHolder();
|
||||
|
||||
@Override
|
||||
public Cursor apply(@Nullable final Long input)
|
||||
public Cursor apply(final Long input)
|
||||
{
|
||||
final long timeStart = Math.max(input, actualInterval.getStartMillis());
|
||||
|
||||
return new Cursor()
|
||||
{
|
||||
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this, currEntry);
|
||||
private final ValueMatcher filterMatcher = makeFilterMatcher(filter, this);
|
||||
private Iterator<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> baseIter;
|
||||
private Iterable<Map.Entry<IncrementalIndex.TimeAndDims, Integer>> cursorIterable;
|
||||
private boolean emptyRange;
|
||||
|
@ -615,11 +608,11 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
);
|
||||
}
|
||||
|
||||
private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor, final EntryHolder holder)
|
||||
private ValueMatcher makeFilterMatcher(final Filter filter, final Cursor cursor)
|
||||
{
|
||||
return filter == null
|
||||
? new BooleanValueMatcher(true)
|
||||
: filter.makeMatcher(new CursorAndEntryHolderValueMatcherFactory(cursor, holder));
|
||||
: filter.makeMatcher(cursor);
|
||||
}
|
||||
|
||||
public static class EntryHolder
|
||||
|
@ -647,84 +640,6 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private class CursorAndEntryHolderValueMatcherFactory implements ValueMatcherFactory
|
||||
{
|
||||
private final EntryHolder holder;
|
||||
private final Cursor cursor;
|
||||
|
||||
public CursorAndEntryHolderValueMatcherFactory(
|
||||
Cursor cursor,
|
||||
EntryHolder holder
|
||||
)
|
||||
{
|
||||
this.cursor = cursor;
|
||||
this.holder = holder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final String originalValue)
|
||||
{
|
||||
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
|
||||
if (dimensionDesc == null) {
|
||||
// filtering on long metrics and __time is supported as well
|
||||
final Integer metricIndexInt = index.getMetricIndex(dimension);
|
||||
if (metricIndexInt != null || dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
ValueType type = getTypeForDimension(dimension);
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return Filters.getLongValueMatcher(cursor.makeLongColumnSelector(dimension), originalValue);
|
||||
default:
|
||||
return new BooleanValueMatcher(Strings.isNullOrEmpty(originalValue));
|
||||
}
|
||||
} else {
|
||||
return new BooleanValueMatcher(Strings.isNullOrEmpty(originalValue));
|
||||
}
|
||||
} else {
|
||||
final DimensionIndexer indexer = dimensionDesc.getIndexer();
|
||||
final int dimIndex = dimensionDesc.getIndex();
|
||||
return indexer.makeIndexingValueMatcher(originalValue, holder, dimIndex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory)
|
||||
{
|
||||
IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension);
|
||||
if (dimensionDesc == null) {
|
||||
// filtering on long metrics and __time is supported as well
|
||||
final Integer metricIndexInt = index.getMetricIndex(dimension);
|
||||
if (metricIndexInt != null || dimension.equals(Column.TIME_COLUMN_NAME)) {
|
||||
ValueType type = getTypeForDimension(dimension);
|
||||
switch (type) {
|
||||
case LONG:
|
||||
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
|
||||
default:
|
||||
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
} else {
|
||||
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
|
||||
}
|
||||
} else {
|
||||
final DimensionIndexer indexer = dimensionDesc.getIndexer();
|
||||
final int dimIndex = dimensionDesc.getIndex();
|
||||
return indexer.makeIndexingValueMatcher(predicateFactory, holder, dimIndex);
|
||||
}
|
||||
}
|
||||
|
||||
// for long metrics and __time
|
||||
private ValueMatcher makeLongValueMatcher(String dimension, DruidLongPredicate predicate)
|
||||
{
|
||||
return Filters.getLongPredicateMatcher(cursor.makeLongColumnSelector(dimension), predicate);
|
||||
}
|
||||
|
||||
private ValueType getTypeForDimension(String dimension)
|
||||
{
|
||||
ColumnCapabilities capabilities = index.getCapabilities(dimension);
|
||||
return capabilities == null ? ValueType.STRING : capabilities.getType();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Metadata getMetadata()
|
||||
{
|
||||
|
|
|
@ -177,8 +177,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
|
|||
ColumnSelectorFactory columnSelectorFactory = makeColumnSelectorFactory(
|
||||
agg,
|
||||
rowSupplier,
|
||||
deserializeComplexMetrics,
|
||||
getColumnCapabilities()
|
||||
deserializeComplexMetrics
|
||||
);
|
||||
|
||||
selectors.put(
|
||||
|
@ -229,7 +228,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
|
|||
for (int i = 0; i < metrics.length; i++) {
|
||||
final AggregatorFactory agg = metrics[i];
|
||||
getAggs()[i] = agg.factorizeBuffered(
|
||||
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, getColumnCapabilities())
|
||||
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
|
||||
);
|
||||
}
|
||||
rowContainer.set(null);
|
||||
|
|
|
@ -158,7 +158,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
|
|||
for (AggregatorFactory agg : metrics) {
|
||||
selectors.put(
|
||||
agg.getName(),
|
||||
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, getColumnCapabilities()))
|
||||
new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -91,6 +91,7 @@ import io.druid.query.filter.OrDimFilter;
|
|||
import io.druid.query.filter.RegexDimFilter;
|
||||
import io.druid.query.filter.SearchQueryDimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.groupby.having.BaseHavingSpec;
|
||||
import io.druid.query.groupby.having.DimFilterHavingSpec;
|
||||
import io.druid.query.groupby.having.DimensionSelectorHavingSpec;
|
||||
import io.druid.query.groupby.having.EqualToHavingSpec;
|
||||
|
@ -3366,7 +3367,7 @@ public class GroupByQueryRunnerTest
|
|||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setHavingSpec(
|
||||
new OrHavingSpec(
|
||||
ImmutableList.of(
|
||||
ImmutableList.<HavingSpec>of(
|
||||
new GreaterThanHavingSpec("rows", 2L),
|
||||
new EqualToHavingSpec("idx", 217L)
|
||||
)
|
||||
|
@ -3495,7 +3496,7 @@ public class GroupByQueryRunnerTest
|
|||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setHavingSpec(
|
||||
new OrHavingSpec(
|
||||
ImmutableList.of(
|
||||
ImmutableList.<HavingSpec>of(
|
||||
new GreaterThanHavingSpec("rows", 2L),
|
||||
new EqualToHavingSpec("idx", 217L)
|
||||
)
|
||||
|
@ -3604,7 +3605,7 @@ public class GroupByQueryRunnerTest
|
|||
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
|
||||
.setHavingSpec(
|
||||
new OrHavingSpec(
|
||||
ImmutableList.of(
|
||||
ImmutableList.<HavingSpec>of(
|
||||
new GreaterThanHavingSpec("rows_times_10", 20L),
|
||||
new EqualToHavingSpec("idx", 217L)
|
||||
)
|
||||
|
@ -4655,7 +4656,7 @@ public class GroupByQueryRunnerTest
|
|||
)
|
||||
)
|
||||
.setHavingSpec(
|
||||
new HavingSpec()
|
||||
new BaseHavingSpec()
|
||||
{
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
|
@ -4926,7 +4927,7 @@ public class GroupByQueryRunnerTest
|
|||
)
|
||||
)
|
||||
.setHavingSpec(
|
||||
new HavingSpec()
|
||||
new BaseHavingSpec()
|
||||
{
|
||||
@Override
|
||||
public boolean eval(Row row)
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.groupby.having;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.internal.matchers.ThrowableMessageMatcher;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class DimFilterHavingSpecTest
|
||||
{
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testSimple()
|
||||
{
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "bar", null));
|
||||
havingSpec.setRowSignature(null);
|
||||
|
||||
Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", "bar"))));
|
||||
Assert.assertFalse(havingSpec.eval(new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", "baz"))));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRowSignature()
|
||||
{
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null));
|
||||
havingSpec.setRowSignature(ImmutableMap.of("foo", ValueType.LONG));
|
||||
|
||||
Assert.assertTrue(havingSpec.eval(new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", 1L))));
|
||||
Assert.assertFalse(havingSpec.eval(new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", 2L))));
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000L)
|
||||
public void testConcurrentUsage() throws Exception
|
||||
{
|
||||
final ExecutorService exec = Executors.newFixedThreadPool(2);
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null));
|
||||
final List<Future<?>> futures = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
final MapBasedRow row = new MapBasedRow(0, ImmutableMap.<String, Object>of("foo", String.valueOf(i)));
|
||||
futures.add(
|
||||
exec.submit(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
havingSpec.setRowSignature(null);
|
||||
while (!Thread.interrupted()) {
|
||||
havingSpec.eval(row);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
expectedException.expect(ExecutionException.class);
|
||||
expectedException.expectCause(CoreMatchers.<IllegalStateException>instanceOf(IllegalStateException.class));
|
||||
expectedException.expectCause(
|
||||
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString("concurrent 'eval' calls not permitted"))
|
||||
);
|
||||
|
||||
try {
|
||||
for (Future<?> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
|
||||
// Not reached
|
||||
Assert.assertTrue(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
final DimFilterHavingSpec havingSpec = new DimFilterHavingSpec(new SelectorDimFilter("foo", "1", null));
|
||||
final ObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
Assert.assertEquals(
|
||||
havingSpec,
|
||||
objectMapper.readValue(objectMapper.writeValueAsBytes(havingSpec), HavingSpec.class)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -44,10 +44,10 @@ public class HavingSpecTest
|
|||
|
||||
@Test
|
||||
public void testHavingClauseSerde() throws Exception {
|
||||
List<HavingSpec> havings = Arrays.asList(
|
||||
List<HavingSpec> havings = Arrays.<HavingSpec>asList(
|
||||
new GreaterThanHavingSpec("agg", Double.valueOf(1.3)),
|
||||
new OrHavingSpec(
|
||||
Arrays.asList(
|
||||
Arrays.<HavingSpec>asList(
|
||||
new LessThanHavingSpec("lessAgg", Long.valueOf(1L)),
|
||||
new NotHavingSpec(new EqualToHavingSpec("equalAgg", Double.valueOf(2)))
|
||||
)
|
||||
|
@ -152,7 +152,7 @@ public class HavingSpecTest
|
|||
assertFalse(spec.eval(getTestRow(Long.MAX_VALUE)));
|
||||
}
|
||||
|
||||
private static class CountingHavingSpec implements HavingSpec {
|
||||
private static class CountingHavingSpec extends BaseHavingSpec {
|
||||
|
||||
private final AtomicInteger counter;
|
||||
private final boolean value;
|
||||
|
|
|
@ -23,8 +23,11 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import io.druid.collections.bitmap.ImmutableBitmap;
|
||||
import io.druid.common.guava.SettableSupplier;
|
||||
import io.druid.common.utils.JodaUtils;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
|
@ -39,8 +42,8 @@ import io.druid.query.filter.BitmapIndexSelector;
|
|||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.Filter;
|
||||
import io.druid.query.filter.ValueMatcher;
|
||||
import io.druid.query.filter.ValueMatcherFactory;
|
||||
import io.druid.query.groupby.RowBasedValueMatcherFactory;
|
||||
import io.druid.query.groupby.RowBasedColumnSelectorFactory;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.Cursor;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.IndexBuilder;
|
||||
|
@ -51,6 +54,7 @@ import io.druid.segment.QueryableIndexStorageAdapter;
|
|||
import io.druid.segment.StorageAdapter;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.VirtualColumns;
|
||||
import io.druid.segment.column.ValueType;
|
||||
import io.druid.segment.data.BitmapSerdeFactory;
|
||||
import io.druid.segment.data.ConciseBitmapSerdeFactory;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
@ -371,7 +375,7 @@ public abstract class BaseFilterTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
|
||||
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
|
||||
{
|
||||
return theFilter.makeMatcher(factory);
|
||||
}
|
||||
|
@ -411,16 +415,25 @@ public abstract class BaseFilterTest
|
|||
return Sequences.toList(seq, new ArrayList<List<String>>()).get(0);
|
||||
}
|
||||
|
||||
private List<String> selectColumnValuesMatchingFilterUsingRowBasedValueMatcherFactory(
|
||||
private List<String> selectColumnValuesMatchingFilterUsingRowBasedColumnSelectorFactory(
|
||||
final DimFilter filter,
|
||||
final String selectColumn
|
||||
)
|
||||
{
|
||||
final RowBasedValueMatcherFactory matcherFactory = new RowBasedValueMatcherFactory();
|
||||
final ValueMatcher matcher = makeFilter(filter).makeMatcher(matcherFactory);
|
||||
// Generate rowType
|
||||
final Map<String, ValueType> rowSignature = Maps.newHashMap();
|
||||
for (String columnName : Iterables.concat(adapter.getAvailableDimensions(), adapter.getAvailableMetrics())) {
|
||||
rowSignature.put(columnName, adapter.getColumnCapabilities(columnName).getType());
|
||||
}
|
||||
|
||||
// Perform test
|
||||
final SettableSupplier<InputRow> rowSupplier = new SettableSupplier<>();
|
||||
final ValueMatcher matcher = makeFilter(filter).makeMatcher(
|
||||
RowBasedColumnSelectorFactory.create(rowSupplier, rowSignature)
|
||||
);
|
||||
final List<String> values = Lists.newArrayList();
|
||||
for (InputRow row : rows) {
|
||||
matcherFactory.setRow(row);
|
||||
rowSupplier.set(row);
|
||||
if (matcher.matches()) {
|
||||
values.add((String) row.getRaw(selectColumn));
|
||||
}
|
||||
|
@ -449,9 +462,9 @@ public abstract class BaseFilterTest
|
|||
selectCountUsingFilteredAggregator(filter)
|
||||
);
|
||||
Assert.assertEquals(
|
||||
"RowBasedValueMatcherFactory: " + filter.toString(),
|
||||
"RowBasedColumnSelectorFactory: " + filter.toString(),
|
||||
expectedRows,
|
||||
selectColumnValuesMatchingFilterUsingRowBasedValueMatcherFactory(filter, "dim0")
|
||||
selectColumnValuesMatchingFilterUsingRowBasedColumnSelectorFactory(filter, "dim0")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -206,7 +206,7 @@ public class LongFilteringTest extends BaseFilterTest
|
|||
|
||||
assertFilterMatches(
|
||||
new BoundDimFilter(LONG_COLUMN, " ", "4", false, false, null, null, StringComparators.NUMERIC),
|
||||
ImmutableList.<String>of()
|
||||
ImmutableList.<String>of("1", "2", "3", "4")
|
||||
);
|
||||
|
||||
assertFilterMatches(
|
||||
|
|
|
@ -158,7 +158,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
|
|||
for (int i = 0; i < metrics.length; i++) {
|
||||
final AggregatorFactory agg = metrics[i];
|
||||
aggs[i] = agg.factorize(
|
||||
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics, null)
|
||||
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
|
||||
);
|
||||
}
|
||||
Integer rowIndex;
|
||||
|
|
Loading…
Reference in New Issue