Add dimension type-based interface for query processing (#3570)

* Add dimension type-based interface for query processing

* PR comment changes

* Address PR comments

* Use getters for QueryDimensionInfo

* Split DimensionQueryHelper into base interface and query-specific interfaces

* Treat empty rows as nulls in v2 groupby

* Reduce boxing in SearchQueryRunner

* Add GroupBy empty row handling to MultiValuedDimensionTest

* Address PR comments

* PR comments and refactoring

* More PR comments

* PR comments
This commit is contained in:
Jonathan Wei 2016-12-21 19:11:37 -08:00 committed by David Lim
parent 4ca3b7f1e4
commit 0e5bd8b4d4
52 changed files with 2016 additions and 733 deletions

View File

@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.segment.ColumnValueSelector;
/**
* A grouping of various related objects used during query processing for a single dimension, used for convenience.
*
* Each ColumnSelectorPlus is associated with a single dimension.
*/
public class ColumnSelectorPlus<ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
{
/**
* Helper object that handles row value operations that pertain to a specific query type for this
* dimension within query processing engines.
*/
private final ColumnSelectorStrategyClass columnSelectorStrategy;
/**
* Internal name of the dimension.
*/
private final String name;
/**
* Name of the dimension to be returned in query results.
*/
private final String outputName;
/**
* Column value selector for this dimension, e.g. a DimensionSelector for String dimensions.
*/
private final ColumnValueSelector selector;
public ColumnSelectorPlus(
String columnName,
String outputName,
ColumnSelectorStrategyClass columnSelectorStrategy,
ColumnValueSelector selector
)
{
this.columnSelectorStrategy = columnSelectorStrategy;
this.name = columnName;
this.outputName = outputName;
this.selector = selector;
}
public ColumnSelectorStrategyClass getColumnSelectorStrategy()
{
return columnSelectorStrategy;
}
public String getName()
{
return name;
}
public String getOutputName()
{
return outputName;
}
public ColumnValueSelector getSelector()
{
return selector;
}
}

View File

@ -21,28 +21,28 @@ package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DruidLongPredicate;
import io.druid.query.filter.DruidPredicateFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.BooleanValueMatcher;
import io.druid.segment.filter.Filters;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
public class FilteredAggregatorFactory extends AggregatorFactory
{
@ -211,6 +211,9 @@ public class FilteredAggregatorFactory extends AggregatorFactory
private static class FilteredAggregatorValueMatcherFactory implements ValueMatcherFactory
{
private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY =
new ValueMatcherColumnSelectorStrategyFactory();
private final ColumnSelectorFactory columnSelectorFactory;
public FilteredAggregatorValueMatcherFactory(ColumnSelectorFactory columnSelectorFactory)
@ -228,67 +231,16 @@ public class FilteredAggregatorFactory extends AggregatorFactory
);
}
final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector(
new DefaultDimensionSpec(dimension, dimension)
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
columnSelectorFactory
);
// Compare "value" as null if it's empty.
final String valueString = Strings.emptyToNull(value);
// Missing columns match a null or empty string value, and don't match anything else.
if (selector == null) {
return new BooleanValueMatcher(valueString == null);
}
final int cardinality = selector.getValueCardinality();
if (cardinality >= 0) {
// Dictionary-encoded dimension. Compare by id instead of by value to save time.
final int valueId = selector.lookupId(valueString);
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return valueString == null;
} else {
for (int i = 0; i < size; ++i) {
if (row.get(i) == valueId) {
return true;
}
}
return false;
}
}
};
} else {
// Not dictionary-encoded. Skip the optimization.
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return valueString == null;
} else {
for (int i = 0; i < size; ++i) {
if (Objects.equals(selector.lookupName(row.get(i)), valueString)) {
return true;
}
}
return false;
}
}
};
}
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
return strategy.getValueMatcher(dimension, columnSelectorFactory, value);
}
public ValueMatcher makeValueMatcher(final String dimension, final DruidPredicateFactory predicateFactory)
@ -298,77 +250,18 @@ public class FilteredAggregatorFactory extends AggregatorFactory
case LONG:
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
case STRING:
return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate());
default:
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
}
}
public ValueMatcher makeStringValueMatcher(final String dimension, final Predicate<String> predicate)
{
final DimensionSelector selector = columnSelectorFactory.makeDimensionSelector(
new DefaultDimensionSpec(dimension, dimension)
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
columnSelectorFactory
);
final boolean doesMatchNull = predicate.apply(null);
if (selector == null) {
return new BooleanValueMatcher(doesMatchNull);
}
final int cardinality = selector.getValueCardinality();
if (cardinality >= 0) {
// Dictionary-encoded dimension. Check every value; build a bitset of matching ids.
final BitSet valueIds = new BitSet(cardinality);
for (int i = 0; i < cardinality; i++) {
if (predicate.apply(selector.lookupName(i))) {
valueIds.set(i);
}
}
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return doesMatchNull;
} else {
for (int i = 0; i < size; ++i) {
if (valueIds.get(row.get(i))) {
return true;
}
}
return false;
}
}
};
} else {
// Not dictionary-encoded. Skip the optimization.
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return doesMatchNull;
} else {
for (int i = 0; i < size; ++i) {
if (predicate.apply(selector.lookupName(row.get(i)))) {
return true;
}
}
return false;
}
}
};
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
return strategy.getValueMatcher(dimension, columnSelectorFactory, predicateFactory);
default:
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
}
}

View File

@ -23,74 +23,57 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.util.Arrays;
import java.util.List;
public class CardinalityAggregator implements Aggregator
{
private static final String NULL_STRING = "\u0000";
private final List<DimensionSelector> selectorList;
private final String name;
private final List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList;
private final boolean byRow;
private static final HashFunction hashFn = Hashing.murmur3_128();
public static final char SEPARATOR = '\u0001';
public static final HashFunction hashFn = Hashing.murmur3_128();
protected static void hashRow(List<DimensionSelector> selectorList, HyperLogLogCollector collector)
protected static void hashRow(
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
HyperLogLogCollector collector
)
{
final Hasher hasher = hashFn.newHasher();
for (int k = 0; k < selectorList.size(); ++k) {
for (int k = 0; k < selectorPlusList.size(); ++k) {
if (k != 0) {
hasher.putByte((byte) 0);
}
final DimensionSelector selector = selectorList.get(k);
final IndexedInts row = selector.getRow();
final int size = row.size();
// nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases.
if (size == 1) {
final String value = selector.lookupName(row.get(0));
hasher.putUnencodedChars(value != null ? value : NULL_STRING);
} else if (size != 0) {
final String[] values = new String[size];
for (int i = 0; i < size; ++i) {
final String value = selector.lookupName(row.get(i));
values[i] = value != null ? value : NULL_STRING;
}
// Values need to be sorted to ensure consistent multi-value ordering across different segments
Arrays.sort(values);
for (int i = 0; i < size; ++i) {
if (i != 0) {
hasher.putChar(SEPARATOR);
}
hasher.putUnencodedChars(values[i]);
}
}
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus = selectorPlusList.get(k);
selectorPlus.getColumnSelectorStrategy().hashRow(selectorPlus.getSelector(), hasher);
}
collector.add(hasher.hash().asBytes());
}
protected static void hashValues(final List<DimensionSelector> selectors, HyperLogLogCollector collector)
protected static void hashValues(
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
HyperLogLogCollector collector
)
{
for (final DimensionSelector selector : selectors) {
for (final Integer index : selector.getRow()) {
final String value = selector.lookupName(index);
collector.add(hashFn.hashUnencodedChars(value == null ? NULL_STRING : value).asBytes());
}
for (final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus : selectorPlusList) {
selectorPlus.getColumnSelectorStrategy().hashValues(selectorPlus.getSelector(), collector);
}
}
private HyperLogLogCollector collector;
public CardinalityAggregator(
List<DimensionSelector> selectorList,
String name,
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
boolean byRow
)
{
this.selectorList = selectorList;
this.name = name;
this.selectorPlusList = selectorPlusList;
this.collector = HyperLogLogCollector.makeLatestCollector();
this.byRow = byRow;
}
@ -99,9 +82,9 @@ public class CardinalityAggregator implements Aggregator
public void aggregate()
{
if (byRow) {
hashRow(selectorList, collector);
hashRow(selectorPlusList, collector);
} else {
hashValues(selectorList, collector);
hashValues(selectorPlusList, collector);
}
}
@ -138,7 +121,7 @@ public class CardinalityAggregator implements Aggregator
@Override
public Aggregator clone()
{
return new CardinalityAggregator(selectorList, byRow);
return new CardinalityAggregator(name, selectorPlusList, byRow);
}
@Override

View File

@ -23,26 +23,28 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.StringUtils;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionHandlerUtils;
import org.apache.commons.codec.binary.Base64;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@ -94,6 +96,8 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
private static final byte CACHE_TYPE_ID = (byte) 0x8;
private static final byte CACHE_KEY_SEPARATOR = (byte) 0xFF;
private static final CardinalityAggregatorColumnSelectorStrategyFactory STRATEGY_FACTORY =
new CardinalityAggregatorColumnSelectorStrategyFactory();
private final String name;
private final List<DimensionSpec> fields;
@ -133,44 +137,36 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
@Override
public Aggregator factorize(final ColumnSelectorFactory columnFactory)
{
List<DimensionSelector> selectors = makeDimensionSelectors(columnFactory);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList =
Arrays.asList(DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
fields,
columnFactory
));
if (selectors.isEmpty()) {
if (selectorPlusList.isEmpty()) {
return Aggregators.noopAggregator();
}
return new CardinalityAggregator(selectors, byRow);
return new CardinalityAggregator(name, selectorPlusList, byRow);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
List<DimensionSelector> selectors = makeDimensionSelectors(columnFactory);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList =
Arrays.asList(DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
fields,
columnFactory
));
if (selectors.isEmpty()) {
if (selectorPlusList.isEmpty()) {
return Aggregators.noopBufferAggregator();
}
return new CardinalityBufferAggregator(selectors, byRow);
}
private List<DimensionSelector> makeDimensionSelectors(final ColumnSelectorFactory columnFactory)
{
return Lists.newArrayList(
Iterables.filter(
Iterables.transform(
fields, new Function<DimensionSpec, DimensionSelector>()
{
@Override
public DimensionSelector apply(DimensionSpec input)
{
return columnFactory.makeDimensionSelector(input);
}
}
), Predicates.notNull()
)
);
return new CardinalityBufferAggregator(selectorPlusList, byRow);
}
@Override

View File

@ -20,25 +20,26 @@
package io.druid.query.aggregation.cardinality;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
import java.util.List;
public class CardinalityBufferAggregator implements BufferAggregator
{
private final List<DimensionSelector> selectorList;
private final List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList;
private final boolean byRow;
private static final byte[] EMPTY_BYTES = HyperLogLogCollector.makeEmptyVersionedByteArray();
public CardinalityBufferAggregator(
List<DimensionSelector> selectorList,
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> selectorPlusList,
boolean byRow
)
{
this.selectorList = selectorList;
this.selectorPlusList = selectorPlusList;
this.byRow = byRow;
}
@ -62,9 +63,9 @@ public class CardinalityBufferAggregator implements BufferAggregator
try {
final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
if (byRow) {
CardinalityAggregator.hashRow(selectorList, collector);
CardinalityAggregator.hashRow(selectorPlusList, collector);
} else {
CardinalityAggregator.hashValues(selectorList, collector);
CardinalityAggregator.hashValues(selectorPlusList, collector);
}
}
finally {

View File

@ -0,0 +1,46 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.cardinality.types;
import com.google.common.hash.Hasher;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.segment.ColumnValueSelector;
public interface CardinalityAggregatorColumnSelectorStrategy<ValueSelectorType extends ColumnValueSelector> extends
ColumnSelectorStrategy
{
/***
* Retrieve the current row from dimSelector and add the row values to the hasher.
*
* @param dimSelector Dimension value selector
* @param hasher Hasher used for cardinality aggregator calculations
*/
void hashRow(ValueSelectorType dimSelector, Hasher hasher);
/**
* Retrieve the current row from dimSelector and add the row values to HyperLogLogCollector.
*
* @param dimSelector Dimension value selector
* @param collector HLL collector used for cardinality aggregator calculations
*/
void hashValues(ValueSelectorType dimSelector, HyperLogLogCollector collector);
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.cardinality.types;
import io.druid.java.util.common.IAE;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
public class CardinalityAggregatorColumnSelectorStrategyFactory
implements ColumnSelectorStrategyFactory<CardinalityAggregatorColumnSelectorStrategy>
{
@Override
public CardinalityAggregatorColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities
)
{
ValueType type = capabilities.getType();
switch(type) {
case STRING:
return new StringCardinalityAggregatorColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.cardinality.types;
import com.google.common.hash.Hasher;
import io.druid.query.aggregation.cardinality.CardinalityAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;
import java.util.Arrays;
public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DimensionSelector>
{
public static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
public static final char CARDINALITY_AGG_SEPARATOR = '\u0001';
@Override
public void hashRow(DimensionSelector dimSelector, Hasher hasher)
{
final IndexedInts row = dimSelector.getRow();
final int size = row.size();
// nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases.
if (size == 1) {
final String value = dimSelector.lookupName(row.get(0));
hasher.putUnencodedChars(nullToSpecial(value));
} else if (size != 0) {
final String[] values = new String[size];
for (int i = 0; i < size; ++i) {
final String value = dimSelector.lookupName(row.get(i));
values[i] = nullToSpecial(value);
}
// Values need to be sorted to ensure consistent multi-value ordering across different segments
Arrays.sort(values);
for (int i = 0; i < size; ++i) {
if (i != 0) {
hasher.putChar(CARDINALITY_AGG_SEPARATOR);
}
hasher.putUnencodedChars(values[i]);
}
}
}
@Override
public void hashValues(DimensionSelector dimSelector, HyperLogLogCollector collector)
{
for (IntIterator rowIt = dimSelector.getRow().iterator(); rowIt.hasNext(); ) {
int index = rowIt.nextInt();
final String value = dimSelector.lookupName(index);
collector.add(CardinalityAggregator.hashFn.hashUnencodedChars(nullToSpecial(value)).asBytes());
}
}
private String nullToSpecial(String value)
{
return value == null ? CARDINALITY_AGG_NULL_STRING : value;
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.dimension;
/**
* Base type for strategy objects that handle value type operations pertaining to a specific query type
*/
public interface ColumnSelectorStrategy
{
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.dimension;
import io.druid.segment.column.ColumnCapabilities;
public interface ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
{
ColumnSelectorStrategyClass makeColumnSelectorStrategy(ColumnCapabilities capabilities);
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.filter;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.util.BitSet;
import java.util.Objects;
public class StringValueMatcherColumnSelectorStrategy implements ValueMatcherColumnSelectorStrategy
{
@Override
public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final String value)
{
final String valueStr = Strings.emptyToNull(value);
final DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(columnName, columnName)
);
// if matching against null, rows with size 0 should also match
final boolean matchNull = Strings.isNullOrEmpty(valueStr);
final int cardinality = selector.getValueCardinality();
if (cardinality >= 0) {
// Dictionary-encoded dimension. Compare by id instead of by value to save time.
final int valueId = selector.lookupId(valueStr);
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return matchNull;
} else {
for (int i = 0; i < size; ++i) {
if (row.get(i) == valueId) {
return true;
}
}
return false;
}
}
};
} else {
// Not dictionary-encoded. Skip the optimization.
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return matchNull;
} else {
for (int i = 0; i < size; ++i) {
if (Objects.equals(selector.lookupName(row.get(i)), valueStr)) {
return true;
}
}
return false;
}
}
};
}
}
@Override
public ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory)
{
final DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(columnName, columnName)
);
final Predicate<String> predicate = predicateFactory.makeStringPredicate();
final int cardinality = selector.getValueCardinality();
final boolean matchNull = predicate.apply(null);
if (cardinality >= 0) {
// Dictionary-encoded dimension. Check every value; build a bitset of matching ids.
final BitSet valueIds = new BitSet(cardinality);
for (int i = 0; i < cardinality; i++) {
if (predicate.apply(selector.lookupName(i))) {
valueIds.set(i);
}
}
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return matchNull;
} else {
for (int i = 0; i < size; ++i) {
if (valueIds.get(row.get(i))) {
return true;
}
}
return false;
}
}
};
} else {
// Not dictionary-encoded. Skip the optimization.
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = selector.getRow();
final int size = row.size();
if (size == 0) {
// null should match empty rows in multi-value columns
return matchNull;
} else {
for (int i = 0; i < size; ++i) {
if (predicate.apply(selector.lookupName(row.get(i)))) {
return true;
}
}
return false;
}
}
};
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.filter;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.segment.ColumnSelectorFactory;
public interface ValueMatcherColumnSelectorStrategy extends ColumnSelectorStrategy
{
/**
* Create a single value ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory.
*
* @param cursor ColumnSelectorFactory for creating dimension value selectors
* @param value Value to match against
* @return ValueMatcher that matches on 'value'
*/
ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, String value);
/**
* Create a predicate-based ValueMatcher, used for filtering by QueryableIndexStorageAdapter and FilteredAggregatorFactory.
*
* @param cursor ColumnSelectorFactory for creating dimension value selectors
* @param predicateFactory A DruidPredicateFactory that provides the filter predicates to be matched
* @return A ValueMatcher that applies the predicate for this DimensionQueryHelper's value type from the predicateFactory
*/
ValueMatcher getValueMatcher(String columnName, ColumnSelectorFactory cursor, final DruidPredicateFactory predicateFactory);
}

View File

@ -0,0 +1,43 @@
package io.druid.query.filter;
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io.druid.java.util.common.IAE;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
public class ValueMatcherColumnSelectorStrategyFactory
implements ColumnSelectorStrategyFactory<ValueMatcherColumnSelectorStrategy>
{
@Override
public ValueMatcherColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities
)
{
ValueType type = capabilities.getType();
switch (type) {
case STRING:
return new StringValueMatcherColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
}

View File

@ -40,7 +40,7 @@ public interface ValueMatcherFactory
* An implementation of this method should be able to handle dimensions of various types.
*
* @param dimension The dimension to filter.
* @param value The value to match against.
* @param value The value to match against, represented as a String.
*
* @return An object that matches row values on the provided value.
*/

View File

@ -34,15 +34,21 @@ import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.ResourceClosingSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.strategy.GroupByStrategyV2;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.VirtualColumns;
import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import org.joda.time.DateTime;
@ -58,6 +64,19 @@ import java.util.NoSuchElementException;
public class GroupByQueryEngineV2
{
private static final GroupByStrategyFactory STRATEGY_FACTORY = new GroupByStrategyFactory();
private static GroupByColumnSelectorPlus[] createGroupBySelectorPlus(ColumnSelectorPlus<GroupByColumnSelectorStrategy>[] baseSelectorPlus)
{
GroupByColumnSelectorPlus[] retInfo = new GroupByColumnSelectorPlus[baseSelectorPlus.length];
int curPos = 0;
for (int i = 0; i < retInfo.length; i++) {
retInfo[i] = new GroupByColumnSelectorPlus(baseSelectorPlus[i], curPos);
curPos += retInfo[i].getColumnSelectorStrategy().getGroupingKeySize();
}
return retInfo;
}
private GroupByQueryEngineV2()
{
// No instantiation
@ -89,7 +108,7 @@ public class GroupByQueryEngineV2
false
);
final Grouper.KeySerde<ByteBuffer> keySerde = new GroupByEngineKeySerde(query.getDimensions().size());
final ResourceHolder<ByteBuffer> bufferHolder = intermediateResultsBufferPool.take();
final String fudgeTimestampString = Strings.emptyToNull(
@ -115,13 +134,18 @@ public class GroupByQueryEngineV2
@Override
public GroupByEngineIterator make()
{
ColumnSelectorPlus<GroupByColumnSelectorStrategy>[] selectorPlus = DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
query.getDimensions(),
cursor
);
return new GroupByEngineIterator(
query,
config,
cursor,
bufferHolder.get(),
keySerde,
fudgeTimestamp
fudgeTimestamp,
createGroupBySelectorPlus(selectorPlus)
);
}
@ -147,6 +171,177 @@ public class GroupByQueryEngineV2
);
}
private static class GroupByStrategyFactory implements ColumnSelectorStrategyFactory<GroupByColumnSelectorStrategy>
{
@Override
public GroupByColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities
)
{
ValueType type = capabilities.getType();
switch(type) {
case STRING:
return new StringGroupByColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
}
/**
* Contains a collection of query processing methods for type-specific operations used exclusively by
* GroupByQueryEngineV2.
*
* Each GroupByColumnSelectorStrategy is associated with a single dimension.
*/
private interface GroupByColumnSelectorStrategy extends ColumnSelectorStrategy
{
/**
* Return the size, in bytes, of this dimension's values in the grouping key.
*
* For example, a String implementation would return 4, the size of an int.
*
* @return size, in bytes, of this dimension's values in the grouping key.
*/
int getGroupingKeySize();
/**
* Read a value from a grouping key and add it to the group by query result map, using the output name specified
* in a DimensionSpec.
*
* An implementation may choose to not add anything to the result map
* (e.g., as the String implementation does for empty rows)
*
* selectorPlus provides access to:
* - the keyBufferPosition offset from which to read the value
* - the dimension value selector
* - the DimensionSpec for this dimension from the query
*
* @param selectorPlus dimension info containing the key offset, value selector, and dimension spec
* @param resultMap result map for the group by query being served
* @param key grouping key
*/
void processValueFromGroupingKey(
GroupByColumnSelectorPlus selectorPlus,
ByteBuffer key,
Map<String, Object> resultMap
);
/**
* Retrieve a row object from the ColumnSelectorPlus and put it in valuess at columnIndex.
*
* @param selector Value selector for a column.
* @param columnIndex Index of the column within the row values array
* @param valuess Row values array, one index per column
*/
void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess);
/**
* Read the first value within a row values object (IndexedInts, IndexedLongs, etc.) and write that value
* to the keyBuffer at keyBufferPosition. If rowSize is 0, write GROUP_BY_MISSING_VALUE instead.
*
* If the size of the row is > 0, write 1 to stack[] at columnIndex, otherwise write 0.
*
* @param keyBufferPosition Starting offset for this column's value within the grouping key.
* @param columnIndex Index of the column within the row values array
* @param rowObj Row value object for this column (e.g., IndexedInts)
* @param keyBuffer grouping key
* @param stack array containing the current within-row value index for each column
*/
void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack);
/**
* If rowValIdx is less than the size of rowObj (haven't handled all of the row values):
* First, read the value at rowValIdx from a rowObj and write that value to the keyBuffer at keyBufferPosition.
* Then return true
*
* Otherwise, return false.
*
* @param keyBufferPosition Starting offset for this column's value within the grouping key.
* @param rowObj Row value object for this column (e.g., IndexedInts)
* @param rowValIdx Index of the current value being grouped on within the row
* @param keyBuffer grouping key
* @return true if rowValIdx < size of rowObj, false otherwise
*/
boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer);
}
private static class StringGroupByColumnSelectorStrategy implements GroupByColumnSelectorStrategy
{
private static final int GROUP_BY_MISSING_VALUE = -1;
@Override
public int getGroupingKeySize()
{
return Ints.BYTES;
}
@Override
public void processValueFromGroupingKey(GroupByColumnSelectorPlus selectorPlus, ByteBuffer key, Map<String, Object> resultMap)
{
final int id = key.getInt(selectorPlus.getKeyBufferPosition());
// GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
if (id != GROUP_BY_MISSING_VALUE) {
resultMap.put(
selectorPlus.getOutputName(),
((DimensionSelector) selectorPlus.getSelector()).lookupName(id)
);
} else {
resultMap.put(selectorPlus.getOutputName(), "");
}
}
@Override
public void initColumnValues(ColumnValueSelector selector, int columnIndex, Object[] valuess)
{
DimensionSelector dimSelector = (DimensionSelector) selector;
IndexedInts row = dimSelector.getRow();
valuess[columnIndex] = row;
}
@Override
public void initGroupingKeyColumnValue(int keyBufferPosition, int columnIndex, Object rowObj, ByteBuffer keyBuffer, int[] stack)
{
IndexedInts row = (IndexedInts) rowObj;
int rowSize = row.size();
initializeGroupingKeyV2Dimension(row, rowSize, keyBuffer, keyBufferPosition);
stack[columnIndex] = rowSize == 0 ? 0 : 1;
}
@Override
public boolean checkRowIndexAndAddValueToGroupingKey(int keyBufferPosition, Object rowObj, int rowValIdx, ByteBuffer keyBuffer)
{
IndexedInts row = (IndexedInts) rowObj;
int rowSize = row.size();
if (rowValIdx < rowSize) {
keyBuffer.putInt(
keyBufferPosition,
row.get(rowValIdx)
);
return true;
} else {
return false;
}
}
private void initializeGroupingKeyV2Dimension(
final IndexedInts values,
final int rowSize,
final ByteBuffer keyBuffer,
final int keyBufferPosition
)
{
if (rowSize == 0) {
keyBuffer.putInt(keyBufferPosition, GROUP_BY_MISSING_VALUE);
} else {
keyBuffer.putInt(keyBufferPosition, values.get(0));
}
}
}
private static class GroupByEngineIterator implements Iterator<Row>, Closeable
{
private final GroupByQuery query;
@ -155,10 +350,10 @@ public class GroupByQueryEngineV2
private final ByteBuffer buffer;
private final Grouper.KeySerde<ByteBuffer> keySerde;
private final DateTime timestamp;
private final DimensionSelector[] selectors;
private final ByteBuffer keyBuffer;
private final int[] stack;
private final IndexedInts[] valuess;
private final Object[] valuess;
private final GroupByColumnSelectorPlus[] dims;
private int stackp = Integer.MIN_VALUE;
private boolean currentRowWasPartiallyAggregated = false;
@ -169,8 +364,8 @@ public class GroupByQueryEngineV2
final GroupByQueryConfig config,
final Cursor cursor,
final ByteBuffer buffer,
final Grouper.KeySerde<ByteBuffer> keySerde,
final DateTime fudgeTimestamp
final DateTime fudgeTimestamp,
final GroupByColumnSelectorPlus[] dims
)
{
final int dimCount = query.getDimensions().size();
@ -179,14 +374,11 @@ public class GroupByQueryEngineV2
this.querySpecificConfig = config.withOverrides(query);
this.cursor = cursor;
this.buffer = buffer;
this.keySerde = keySerde;
this.keySerde = new GroupByEngineKeySerde(dims);
this.keyBuffer = ByteBuffer.allocate(keySerde.keySize());
this.selectors = new DimensionSelector[dimCount];
for (int i = 0; i < dimCount; i++) {
this.selectors[i] = cursor.makeDimensionSelector(query.getDimensions().get(i));
}
this.dims = dims;
this.stack = new int[dimCount];
this.valuess = new IndexedInts[dimCount];
this.valuess = new Object[dimCount];
// Time is the same for every row in the cursor
this.timestamp = fudgeTimestamp != null ? fudgeTimestamp : cursor.getTime();
@ -226,19 +418,20 @@ outer:
// Set up stack, valuess, and first grouping in keyBuffer for this row
stackp = stack.length - 1;
for (int i = 0; i < selectors.length; i++) {
final DimensionSelector selector = selectors[i];
valuess[i] = selector == null ? EmptyIndexedInts.EMPTY_INDEXED_INTS : selector.getRow();
final int position = Ints.BYTES * i;
if (valuess[i].size() == 0) {
stack[i] = 0;
keyBuffer.putInt(position, -1);
} else {
stack[i] = 1;
keyBuffer.putInt(position, valuess[i].get(0));
}
for (int i = 0; i < dims.length; i++) {
GroupByColumnSelectorStrategy strategy = dims[i].getColumnSelectorStrategy();
strategy.initColumnValues(
dims[i].getSelector(),
i,
valuess
);
strategy.initGroupingKeyColumnValue(
dims[i].getKeyBufferPosition(),
i,
valuess[i],
keyBuffer,
stack
);
}
}
@ -256,28 +449,29 @@ outer:
doAggregate = false;
}
if (stackp >= 0 && stack[stackp] < valuess[stackp].size()) {
// Load next value for current slot
keyBuffer.putInt(
Ints.BYTES * stackp,
valuess[stackp].get(stack[stackp])
if (stackp >= 0) {
doAggregate = dims[stackp].getColumnSelectorStrategy().checkRowIndexAndAddValueToGroupingKey(
dims[stackp].getKeyBufferPosition(),
valuess[stackp],
stack[stackp],
keyBuffer
);
if (doAggregate) {
stack[stackp]++;
// Reset later slots
for (int i = stackp + 1; i < stack.length; i++) {
final int position = Ints.BYTES * i;
if (valuess[i].size() == 0) {
stack[i] = 0;
keyBuffer.putInt(position, -1);
} else {
stack[i] = 1;
keyBuffer.putInt(position, valuess[i].get(0));
dims[i].getColumnSelectorStrategy().initGroupingKeyColumnValue(
dims[i].getKeyBufferPosition(),
i,
valuess[i],
keyBuffer,
stack
);
}
}
stackp = stack.length - 1;
doAggregate = true;
} else {
stackp--;
}
} else {
stackp--;
}
@ -299,16 +493,13 @@ outer:
Map<String, Object> theMap = Maps.newLinkedHashMap();
// Add dimensions.
for (int i = 0; i < selectors.length; i++) {
final int id = entry.getKey().getInt(Ints.BYTES * i);
if (id >= 0) {
theMap.put(
query.getDimensions().get(i).getOutputName(),
selectors[i].lookupName(id)
for (GroupByColumnSelectorPlus selectorPlus : dims) {
selectorPlus.getColumnSelectorStrategy().processValueFromGroupingKey(
selectorPlus,
entry.getKey(),
theMap
);
}
}
// Add aggregations.
for (int i = 0; i < entry.getValues().length; i++) {
@ -356,9 +547,13 @@ outer:
{
private final int keySize;
public GroupByEngineKeySerde(final int dimCount)
public GroupByEngineKeySerde(final GroupByColumnSelectorPlus dims[])
{
this.keySize = dimCount * Ints.BYTES;
int keySize = 0;
for (GroupByColumnSelectorPlus selectorPlus : dims) {
keySize += selectorPlus.getColumnSelectorStrategy().getGroupingKeySize();
}
this.keySize = keySize;
}
@Override
@ -400,4 +595,28 @@ outer:
// No state, nothing to reset
}
}
private static class GroupByColumnSelectorPlus extends ColumnSelectorPlus<GroupByColumnSelectorStrategy>
{
/**
* Indicates the offset of this dimension's value within the grouping key.
*/
private int keyBufferPosition;
public GroupByColumnSelectorPlus(ColumnSelectorPlus<GroupByColumnSelectorStrategy> baseInfo, int keyBufferPosition)
{
super(
baseInfo.getName(),
baseInfo.getOutputName(),
baseInfo.getColumnSelectorStrategy(),
baseInfo.getSelector()
);
this.keyBufferPosition = keyBufferPosition;
}
public int getKeyBufferPosition()
{
return keyBufferPosition;
}
}
}

View File

@ -24,7 +24,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.collections.bitmap.BitmapFactory;
import io.druid.collections.bitmap.ImmutableBitmap;
import io.druid.collections.bitmap.MutableBitmap;
@ -35,11 +34,14 @@ import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.Druids;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.Result;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.IdentityExtractionFn;
import io.druid.query.filter.Filter;
@ -47,7 +49,9 @@ import io.druid.query.search.search.SearchHit;
import io.druid.query.search.search.SearchQuery;
import io.druid.query.search.search.SearchQuerySpec;
import io.druid.segment.ColumnSelectorBitmapIndexSelector;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.DimensionSelector;
import io.druid.segment.QueryableIndex;
import io.druid.segment.Segment;
@ -55,24 +59,96 @@ import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import org.apache.commons.lang.mutable.MutableInt;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntRBTreeMap;
import org.joda.time.Interval;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
/**
*/
public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
{
private static final SearchStrategyFactory STRATEGY_FACTORY = new SearchStrategyFactory();
private static final EmittingLogger log = new EmittingLogger(SearchQueryRunner.class);
private final Segment segment;
private static class SearchStrategyFactory implements ColumnSelectorStrategyFactory<SearchColumnSelectorStrategy>
{
@Override
public SearchColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities
)
{
ValueType type = capabilities.getType();
switch(type) {
case STRING:
return new StringSearchColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
}
public interface SearchColumnSelectorStrategy<ValueSelectorType extends ColumnValueSelector> extends ColumnSelectorStrategy
{
/**
* Read the current row from dimSelector and update the search result set.
*
* For each row value:
* 1. Check if searchQuerySpec accept()s the value
* 2. If so, add the value to the result set and increment the counter for that value
* 3. If the size of the result set reaches the limit after adding a value, return early.
*
* @param outputName Output name for this dimension in the search query being served
* @param dimSelector Dimension value selector
* @param searchQuerySpec Spec for the search query
* @param set The result set of the search query
* @param limit The limit of the search query
*/
void updateSearchResultSet(
String outputName,
ValueSelectorType dimSelector,
SearchQuerySpec searchQuerySpec,
int limit,
Object2IntRBTreeMap<SearchHit> set
);
}
public static class StringSearchColumnSelectorStrategy implements SearchColumnSelectorStrategy<DimensionSelector>
{
@Override
public void updateSearchResultSet(
String outputName,
DimensionSelector selector,
SearchQuerySpec searchQuerySpec,
int limit,
final Object2IntRBTreeMap<SearchHit> set
)
{
if (selector != null) {
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (searchQuerySpec.accept(dimVal)) {
set.addTo(new SearchHit(outputName, dimVal), 1);
if (set.size() >= limit) {
return;
}
}
}
}
}
}
public SearchQueryRunner(Segment segment)
{
this.segment = segment;
@ -93,7 +169,6 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
final List<DimensionSpec> dimensions = query.getDimensions();
final SearchQuerySpec searchQuerySpec = query.getQuery();
final int limit = query.getLimit();
final boolean descending = query.isDescending();
final List<Interval> intervals = query.getQuerySegmentSpec().getIntervals();
if (intervals.size() != 1) {
throw new IAE("Should only have one interval, got[%s]", intervals);
@ -103,85 +178,25 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
// Closing this will cause segfaults in unit tests.
final QueryableIndex index = segment.asQueryableIndex();
if (index != null) {
final TreeMap<SearchHit, MutableInt> retVal = Maps.newTreeMap(query.getSort().getComparator());
final StorageAdapter storageAdapter = segment.asStorageAdapter();
Iterable<DimensionSpec> dimsToSearch;
if (dimensions == null || dimensions.isEmpty()) {
dimsToSearch = Iterables.transform(index.getAvailableDimensions(), Druids.DIMENSION_IDENTITY);
} else {
dimsToSearch = dimensions;
}
final List<DimensionSpec> bitmapDims = Lists.newArrayList();
final List<DimensionSpec> nonBitmapDims = Lists.newArrayList();
partitionDimensionList(index, storageAdapter, dimensions, bitmapDims, nonBitmapDims);
final BitmapFactory bitmapFactory = index.getBitmapFactoryForDimensions();
final Object2IntRBTreeMap<SearchHit> retVal = new Object2IntRBTreeMap<SearchHit>(query.getSort().getComparator());
retVal.defaultReturnValue(0);
final ImmutableBitmap baseFilter =
filter == null ? null : filter.getBitmapIndex(new ColumnSelectorBitmapIndexSelector(bitmapFactory, index));
ImmutableBitmap timeFilteredBitmap;
if (!interval.contains(segment.getDataInterval())) {
MutableBitmap timeBitmap = bitmapFactory.makeEmptyMutableBitmap();
final Column timeColumn = index.getColumn(Column.TIME_COLUMN_NAME);
try (final GenericColumn timeValues = timeColumn.getGenericColumn()) {
int startIndex = Math.max(0, getStartIndexOfTime(timeValues, interval.getStartMillis(), true));
int endIndex = Math.min(
timeValues.length() - 1,
getStartIndexOfTime(timeValues, interval.getEndMillis(), false)
);
for (int i = startIndex; i <= endIndex; i++) {
timeBitmap.add(i);
}
final ImmutableBitmap finalTimeBitmap = bitmapFactory.makeImmutableBitmap(timeBitmap);
timeFilteredBitmap =
(baseFilter == null) ? finalTimeBitmap : finalTimeBitmap.intersection(baseFilter);
}
} else {
timeFilteredBitmap = baseFilter;
}
for (DimensionSpec dimension : dimsToSearch) {
final Column column = index.getColumn(dimension.getDimension());
if (column == null) {
continue;
}
final BitmapIndex bitmapIndex = column.getBitmapIndex();
ExtractionFn extractionFn = dimension.getExtractionFn();
if (extractionFn == null) {
extractionFn = IdentityExtractionFn.getInstance();
}
if (bitmapIndex != null) {
for (int i = 0; i < bitmapIndex.getCardinality(); ++i) {
String dimVal = Strings.nullToEmpty(extractionFn.apply(bitmapIndex.getValue(i)));
if (!searchQuerySpec.accept(dimVal)) {
continue;
}
ImmutableBitmap bitmap = bitmapIndex.getBitmap(i);
if (timeFilteredBitmap != null) {
bitmap = bitmapFactory.intersection(Arrays.asList(timeFilteredBitmap, bitmap));
}
if (bitmap.size() > 0) {
MutableInt counter = new MutableInt(bitmap.size());
MutableInt prev = retVal.put(new SearchHit(dimension.getOutputName(), dimVal), counter);
if (prev != null) {
counter.add(prev.intValue());
}
if (retVal.size() >= limit) {
// Get results from bitmap supporting dims first
if (!bitmapDims.isEmpty()) {
processBitmapDims(index, filter, interval, bitmapDims, searchQuerySpec, limit, retVal);
// If there are no non-bitmap dims to search, or we've already hit the result limit, just return now
if (nonBitmapDims.size() == 0 || retVal.size() >= limit) {
return makeReturnResult(limit, retVal);
}
}
}
}
}
return makeReturnResult(limit, retVal);
}
final StorageAdapter adapter = segment.asStorageAdapter();
if (adapter == null) {
log.makeAlert("WTF!? Unable to process search query on segment.")
.addData("segment", segment.getIdentifier())
@ -190,71 +205,7 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}
final Iterable<DimensionSpec> dimsToSearch;
if (dimensions == null || dimensions.isEmpty()) {
dimsToSearch = Iterables.transform(adapter.getAvailableDimensions(), Druids.DIMENSION_IDENTITY);
} else {
dimsToSearch = dimensions;
}
final Sequence<Cursor> cursors = adapter.makeCursors(
filter,
interval,
VirtualColumns.EMPTY,
query.getGranularity(),
descending
);
final TreeMap<SearchHit, MutableInt> retVal = cursors.accumulate(
Maps.<SearchHit, SearchHit, MutableInt>newTreeMap(query.getSort().getComparator()),
new Accumulator<TreeMap<SearchHit, MutableInt>, Cursor>()
{
@Override
public TreeMap<SearchHit, MutableInt> accumulate(TreeMap<SearchHit, MutableInt> set, Cursor cursor)
{
if (set.size() >= limit) {
return set;
}
Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (DimensionSpec dim : dimsToSearch) {
dimSelectors.put(
dim.getOutputName(),
cursor.makeDimensionSelector(dim)
);
}
while (!cursor.isDone()) {
for (Map.Entry<String, DimensionSelector> entry : dimSelectors.entrySet()) {
final DimensionSelector selector = entry.getValue();
if (selector != null) {
final IndexedInts vals = selector.getRow();
for (int i = 0; i < vals.size(); ++i) {
final String dimVal = selector.lookupName(vals.get(i));
if (searchQuerySpec.accept(dimVal)) {
MutableInt counter = new MutableInt(1);
MutableInt prev = set.put(new SearchHit(entry.getKey(), dimVal), counter);
if (prev != null) {
counter.add(prev.intValue());
}
if (set.size() >= limit) {
return set;
}
}
}
}
}
cursor.advance();
}
return set;
}
}
);
processNonBitmapDims(query, adapter, filter, interval, limit, nonBitmapDims, searchQuerySpec, retVal);
return makeReturnResult(limit, retVal);
}
@ -289,19 +240,22 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
}
private Sequence<Result<SearchResultValue>> makeReturnResult(
int limit, TreeMap<SearchHit, MutableInt> retVal)
int limit,
Object2IntRBTreeMap<SearchHit> retVal
)
{
Iterable<SearchHit> source = Iterables.transform(
retVal.entrySet(), new Function<Map.Entry<SearchHit, MutableInt>, SearchHit>()
retVal.object2IntEntrySet(), new Function<Object2IntMap.Entry<SearchHit>, SearchHit>()
{
@Override
public SearchHit apply(Map.Entry<SearchHit, MutableInt> input)
public SearchHit apply(Object2IntMap.Entry<SearchHit> input)
{
SearchHit hit = input.getKey();
return new SearchHit(hit.getDimension(), hit.getValue(), input.getValue().intValue());
return new SearchHit(hit.getDimension(), hit.getValue(), input.getIntValue());
}
}
);
return Sequences.simple(
ImmutableList.of(
new Result<SearchResultValue>(
@ -313,4 +267,175 @@ public class SearchQueryRunner implements QueryRunner<Result<SearchResultValue>>
)
);
}
// Split dimension list into bitmap-supporting list and non-bitmap supporting list
private void partitionDimensionList(
QueryableIndex index,
StorageAdapter storageAdapter,
List<DimensionSpec> dimensions,
List<DimensionSpec> bitmapDims,
List<DimensionSpec> nonBitmapDims
)
{
List<DimensionSpec> dimsToSearch;
if (dimensions == null || dimensions.isEmpty()) {
dimsToSearch = Lists.newArrayList(Iterables.transform(
storageAdapter.getAvailableDimensions(),
Druids.DIMENSION_IDENTITY
));
} else {
dimsToSearch = dimensions;
}
if (index != null) {
for (DimensionSpec spec : dimsToSearch) {
ColumnCapabilities capabilities = storageAdapter.getColumnCapabilities(spec.getDimension());
if (capabilities == null) {
continue;
}
if (capabilities.hasBitmapIndexes()) {
bitmapDims.add(spec);
} else {
nonBitmapDims.add(spec);
}
}
} else {
// no QueryableIndex available, so nothing has bitmaps
nonBitmapDims.addAll(dimsToSearch);
}
}
private void processNonBitmapDims(
SearchQuery query,
final StorageAdapter adapter,
Filter filter,
Interval interval,
final int limit,
final List<DimensionSpec> nonBitmapDims,
final SearchQuerySpec searchQuerySpec,
final Object2IntRBTreeMap<SearchHit> retVal
)
{
final Sequence<Cursor> cursors = adapter.makeCursors(
filter,
interval,
VirtualColumns.EMPTY,
query.getGranularity(),
query.isDescending()
);
cursors.accumulate(
retVal,
new Accumulator<Object2IntRBTreeMap<SearchHit>, Cursor>()
{
@Override
public Object2IntRBTreeMap<SearchHit> accumulate(Object2IntRBTreeMap<SearchHit> set, Cursor cursor)
{
if (set.size() >= limit) {
return set;
}
List<ColumnSelectorPlus<SearchColumnSelectorStrategy>> selectorPlusList = Arrays.asList(
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
nonBitmapDims,
cursor
)
);
while (!cursor.isDone()) {
for (ColumnSelectorPlus<SearchColumnSelectorStrategy> selectorPlus : selectorPlusList) {
selectorPlus.getColumnSelectorStrategy().updateSearchResultSet(
selectorPlus.getOutputName(),
selectorPlus.getSelector(),
searchQuerySpec,
limit,
set
);
if (set.size() >= limit) {
return set;
}
}
cursor.advance();
}
return set;
}
}
);
}
private void processBitmapDims(
QueryableIndex index,
Filter filter,
Interval interval,
List<DimensionSpec> bitmapDims,
SearchQuerySpec searchQuerySpec,
int limit,
final Object2IntRBTreeMap<SearchHit> retVal
)
{
final BitmapFactory bitmapFactory = index.getBitmapFactoryForDimensions();
final ImmutableBitmap baseFilter =
filter == null ? null : filter.getBitmapIndex(new ColumnSelectorBitmapIndexSelector(bitmapFactory, index));
ImmutableBitmap timeFilteredBitmap;
if (!interval.contains(segment.getDataInterval())) {
MutableBitmap timeBitmap = bitmapFactory.makeEmptyMutableBitmap();
final Column timeColumn = index.getColumn(Column.TIME_COLUMN_NAME);
try (final GenericColumn timeValues = timeColumn.getGenericColumn()) {
int startIndex = Math.max(0, getStartIndexOfTime(timeValues, interval.getStartMillis(), true));
int endIndex = Math.min(
timeValues.length() - 1,
getStartIndexOfTime(timeValues, interval.getEndMillis(), false)
);
for (int i = startIndex; i <= endIndex; i++) {
timeBitmap.add(i);
}
final ImmutableBitmap finalTimeBitmap = bitmapFactory.makeImmutableBitmap(timeBitmap);
timeFilteredBitmap =
(baseFilter == null) ? finalTimeBitmap : finalTimeBitmap.intersection(baseFilter);
}
} else {
timeFilteredBitmap = baseFilter;
}
for (DimensionSpec dimension : bitmapDims) {
final Column column = index.getColumn(dimension.getDimension());
if (column == null) {
continue;
}
final BitmapIndex bitmapIndex = column.getBitmapIndex();
ExtractionFn extractionFn = dimension.getExtractionFn();
if (extractionFn == null) {
extractionFn = IdentityExtractionFn.getInstance();
}
if (bitmapIndex != null) {
for (int i = 0; i < bitmapIndex.getCardinality(); ++i) {
String dimVal = Strings.nullToEmpty(extractionFn.apply(bitmapIndex.getValue(i)));
if (!searchQuerySpec.accept(dimVal)) {
continue;
}
ImmutableBitmap bitmap = bitmapIndex.getBitmap(i);
if (timeFilteredBitmap != null) {
bitmap = bitmapFactory.intersection(Arrays.asList(timeFilteredBitmap, bitmap));
}
if (bitmap.size() > 0) {
retVal.addTo(new SearchHit(dimension.getOutputName(), dimVal), bitmap.size());
if (retVal.size() >= limit) {
return;
}
}
}
}
}
}
}

View File

@ -24,14 +24,20 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.QueryRunnerHelper;
import io.druid.query.Result;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.query.filter.Filter;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.DimensionSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
@ -39,12 +45,16 @@ import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.VirtualColumns;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import io.druid.timeline.DataSegmentUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@ -52,6 +62,67 @@ import java.util.Map;
*/
public class SelectQueryEngine
{
private static final SelectStrategyFactory STRATEGY_FACTORY = new SelectStrategyFactory();
private static class SelectStrategyFactory implements ColumnSelectorStrategyFactory<SelectColumnSelectorStrategy>
{
@Override
public SelectColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities
)
{
ValueType type = capabilities.getType();
switch(type) {
case STRING:
return new StringSelectColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
}
public interface SelectColumnSelectorStrategy<ValueSelectorType extends ColumnValueSelector> extends ColumnSelectorStrategy
{
/**
* Read the current row from dimSelector and add the row values for a dimension to the result map.
*
* Multi-valued rows should be added to the result as a List, single value rows should be added as a single object.
*
* @param outputName Output name for this dimension in the select query being served
* @param dimSelector Dimension value selector
* @param resultMap Row value map for the current row being retrieved by the select query
*/
void addRowValuesToSelectResult(
String outputName,
ValueSelectorType dimSelector,
Map<String, Object> resultMap
);
}
public static class StringSelectColumnSelectorStrategy implements SelectColumnSelectorStrategy<DimensionSelector>
{
@Override
public void addRowValuesToSelectResult(String outputName, DimensionSelector selector, Map<String, Object> resultMap)
{
if (selector == null) {
resultMap.put(outputName, null);
} else {
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
resultMap.put(outputName, dimVal);
} else {
List<String> dimVals = new ArrayList<>(vals.size());
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
resultMap.put(outputName, dimVals);
}
}
}
}
public Sequence<Result<SelectResultValue>> process(final SelectQuery query, final Segment segment)
{
final StorageAdapter adapter = segment.asStorageAdapter();
@ -106,11 +177,16 @@ public class SelectQueryEngine
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final Map<String, DimensionSelector> dimSelectors = Maps.newHashMap();
for (DimensionSpec dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim.getOutputName(), dimSelector);
builder.addDimension(dim.getOutputName());
final List<ColumnSelectorPlus<SelectColumnSelectorStrategy>> selectorPlusList = Arrays.asList(
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
Lists.newArrayList(dims),
cursor
)
);
for (DimensionSpec dimSpec : dims) {
builder.addDimension(dimSpec.getOutputName());
}
final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
@ -129,26 +205,8 @@ public class SelectQueryEngine
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
theEvent.put(EventHolder.timestampKey, new DateTime(timestampColumnSelector.get()));
for (Map.Entry<String, DimensionSelector> dimSelector : dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
if (selector == null) {
theEvent.put(dim, null);
} else {
final IndexedInts vals = selector.getRow();
if (vals.size() == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else {
List<String> dimVals = Lists.newArrayList();
for (int i = 0; i < vals.size(); ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}
for (ColumnSelectorPlus<SelectColumnSelectorStrategy> selectorPlus : selectorPlusList) {
selectorPlus.getColumnSelectorStrategy().addRowValuesToSelectResult(selectorPlus.getOutputName(), selectorPlus.getSelector(), theEvent);
}
for (Map.Entry<String, ObjectColumnSelector> metSelector : metSelectors.entrySet()) {

View File

@ -22,12 +22,12 @@ package io.druid.query.topn;
import io.druid.collections.StupidPool;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -55,11 +55,11 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
@Override
public TopNParams makeInitParams(
DimensionSelector dimSelector, Cursor cursor
ColumnSelectorPlus selectorPlus, Cursor cursor
)
{
return new TopNParams(
dimSelector,
selectorPlus,
cursor,
Integer.MAX_VALUE
);
@ -91,7 +91,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
PooledTopNAlgorithm.PooledTopNParams singleMetricParam = null;
int[] dimValSelector = null;
try {
singleMetricParam = singleMetricAlgo.makeInitParams(params.getDimSelector(), params.getCursor());
singleMetricParam = singleMetricAlgo.makeInitParams(params.getSelectorPlus(), params.getCursor());
singleMetricAlgo.run(
singleMetricParam,
singleMetricResultBuilder,
@ -109,7 +109,7 @@ public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], T
PooledTopNAlgorithm.PooledTopNParams allMetricsParam = null;
try {
// Run topN for all metrics for top N dimension values
allMetricsParam = allMetricAlgo.makeInitParams(params.getDimSelector(), params.getCursor());
allMetricsParam = allMetricAlgo.makeInitParams(params.getSelectorPlus(), params.getCursor());
allMetricAlgo.run(
allMetricsParam,
resultBuilder,

View File

@ -36,7 +36,7 @@ import java.util.List;
public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Parameters extends TopNParams>
implements TopNAlgorithm<DimValSelector, Parameters>
{
protected static Aggregator[] makeAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
public static Aggregator[] makeAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
{
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
@ -58,7 +58,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
return aggregators;
}
private final Capabilities capabilities;
protected final Capabilities capabilities;
protected BaseTopNAlgorithm(Capabilities capabilities)
{
@ -145,12 +145,12 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
DimValAggregateStore dimValAggregateStore
);
protected class AggregatorArrayProvider extends BaseArrayProvider<Aggregator[][]>
public static class AggregatorArrayProvider extends BaseArrayProvider<Aggregator[][]>
{
Aggregator[][] expansionAggs;
int cardinality;
public AggregatorArrayProvider(DimensionSelector dimSelector, TopNQuery query, int cardinality)
public AggregatorArrayProvider(DimensionSelector dimSelector, TopNQuery query, int cardinality, Capabilities capabilities)
{
super(dimSelector, query, capabilities);

View File

@ -20,18 +20,18 @@
package io.druid.query.topn;
import com.google.common.collect.Maps;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.topn.types.TopNColumnSelectorStrategy;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.util.Map;
/**
* This has to be its own strategy because the pooled topn algorithm assumes each index is unique, and cannot handle multiple index numerals referencing the same dimension value.
*/
public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][], Map<String, Aggregator[]>, TopNParams>
public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][], Map<Comparable, Aggregator[]>, TopNParams>
{
private final TopNQuery query;
@ -47,12 +47,12 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
@Override
public TopNParams makeInitParams(
final DimensionSelector dimSelector,
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
final Cursor cursor
)
{
return new TopNParams(
dimSelector,
selectorPlus,
cursor,
Integer.MAX_VALUE
);
@ -61,16 +61,8 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
@Override
protected Aggregator[][] makeDimValSelector(TopNParams params, int numProcessed, int numToProcess)
{
final AggregatorArrayProvider provider = new AggregatorArrayProvider(
params.getDimSelector(),
query,
params.getCardinality()
);
// Unlike regular topN we cannot rely on ordering to optimize.
// Optimization possibly requires a reverse lookup from value to ID, which is
// not possible when applying an extraction function
return provider.build();
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
return selectorPlus.getColumnSelectorStrategy().getDimExtractionRowSelector(query, params, capabilities);
}
@Override
@ -80,7 +72,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
}
@Override
protected Map<String, Aggregator[]> makeDimValAggregateStore(TopNParams params)
protected Map<Comparable, Aggregator[]> makeDimValAggregateStore(TopNParams params)
{
return Maps.newHashMap();
}
@ -89,35 +81,21 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
public void scanAndAggregate(
TopNParams params,
Aggregator[][] rowSelector,
Map<String, Aggregator[]> aggregatesStore,
Map<Comparable, Aggregator[]> aggregatesStore,
int numProcessed
)
{
final Cursor cursor = params.getCursor();
final DimensionSelector dimSelector = params.getDimSelector();
final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus = params.getSelectorPlus();
while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow();
for (int i = 0; i < dimValues.size(); ++i) {
final int dimIndex = dimValues.get(i);
Aggregator[] theAggregators = rowSelector[dimIndex];
if (theAggregators == null) {
final String key = dimSelector.lookupName(dimIndex);
theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
rowSelector[dimIndex] = theAggregators;
}
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
}
selectorPlus.getColumnSelectorStrategy().dimExtractionScanAndAggregate(
query,
selectorPlus.getSelector(),
cursor,
rowSelector,
aggregatesStore
);
cursor.advance();
}
}
@ -126,11 +104,11 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
protected void updateResults(
TopNParams params,
Aggregator[][] rowSelector,
Map<String, Aggregator[]> aggregatesStore,
Map<Comparable, Aggregator[]> aggregatesStore,
TopNResultBuilder resultBuilder
)
{
for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
for (Map.Entry<Comparable, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null && aggs.length > 0) {
Object[] vals = new Object[aggs.length];
@ -139,7 +117,7 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
}
resultBuilder.addEntry(
entry.getKey(),
entry.getKey() == null ? null : entry.getKey().toString(),
entry.getKey(),
vals
);
@ -148,9 +126,9 @@ public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][]
}
@Override
protected void closeAggregators(Map<String, Aggregator[]> stringMap)
protected void closeAggregators(Map<Comparable, Aggregator[]> valueMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator[] aggregators : valueMap.values()) {
for (Aggregator agg : aggregators) {
agg.close();
}

View File

@ -24,6 +24,7 @@ import io.druid.collections.StupidPool;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
@ -57,13 +58,14 @@ public class PooledTopNAlgorithm
@Override
public PooledTopNParams makeInitParams(
DimensionSelector dimSelector, Cursor cursor
ColumnSelectorPlus selectorPlus, Cursor cursor
)
{
ResourceHolder<ByteBuffer> resultsBufHolder = bufferPool.take();
ByteBuffer resultsBuf = resultsBufHolder.get();
resultsBuf.clear();
final DimensionSelector dimSelector = (DimensionSelector) selectorPlus.getSelector();
final int cardinality = dimSelector.getValueCardinality();
if (cardinality < 0) {
@ -103,7 +105,7 @@ public class PooledTopNAlgorithm
final int numValuesPerPass = numBytesPerRecord > 0 ? numBytesToWorkWith / numBytesPerRecord : cardinality;
return PooledTopNParams.builder()
.withDimSelector(dimSelector)
.withSelectorPlus(selectorPlus)
.withCursor(cursor)
.withResultsBufHolder(resultsBufHolder)
.withResultsBuf(resultsBuf)
@ -507,7 +509,7 @@ public class PooledTopNAlgorithm
private final TopNMetricSpecBuilder<int[]> arrayProvider;
public PooledTopNParams(
DimensionSelector dimSelector,
ColumnSelectorPlus selectorPlus,
Cursor cursor,
ResourceHolder<ByteBuffer> resultsBufHolder,
ByteBuffer resultsBuf,
@ -517,7 +519,7 @@ public class PooledTopNAlgorithm
TopNMetricSpecBuilder<int[]> arrayProvider
)
{
super(dimSelector, cursor, numValuesPerPass);
super(selectorPlus, cursor, numValuesPerPass);
this.resultsBufHolder = resultsBufHolder;
this.resultsBuf = resultsBuf;
@ -558,7 +560,7 @@ public class PooledTopNAlgorithm
public static class Builder
{
private DimensionSelector dimSelector;
private ColumnSelectorPlus selectorPlus;
private Cursor cursor;
private ResourceHolder<ByteBuffer> resultsBufHolder;
private ByteBuffer resultsBuf;
@ -569,7 +571,7 @@ public class PooledTopNAlgorithm
public Builder()
{
dimSelector = null;
selectorPlus = null;
cursor = null;
resultsBufHolder = null;
resultsBuf = null;
@ -579,9 +581,9 @@ public class PooledTopNAlgorithm
arrayProvider = null;
}
public Builder withDimSelector(DimensionSelector dimSelector)
public Builder withSelectorPlus(ColumnSelectorPlus selectorPlus)
{
this.dimSelector = dimSelector;
this.selectorPlus = selectorPlus;
return this;
}
@ -630,7 +632,7 @@ public class PooledTopNAlgorithm
public PooledTopNParams build()
{
return new PooledTopNParams(
dimSelector,
selectorPlus,
cursor,
resultsBufHolder,
resultsBuf,

View File

@ -21,6 +21,7 @@ package io.druid.query.topn;
import com.google.common.collect.Maps;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
@ -40,10 +41,10 @@ public class TimeExtractionTopNAlgorithm extends BaseTopNAlgorithm<int[], Map<St
@Override
public TopNParams makeInitParams(DimensionSelector dimSelector, Cursor cursor)
public TopNParams makeInitParams(ColumnSelectorPlus selectorPlus, Cursor cursor)
{
return new TopNParams(
dimSelector,
selectorPlus,
cursor,
Integer.MAX_VALUE
);

View File

@ -19,9 +19,10 @@
package io.druid.query.topn;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.topn.types.TopNColumnSelectorStrategy;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
/**
*/
@ -31,7 +32,7 @@ public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
public static final int INIT_POSITION_VALUE = -1;
public static final int SKIP_POSITION_VALUE = -2;
public TopNParams makeInitParams(DimensionSelector dimSelector, Cursor cursor);
public TopNParams makeInitParams(ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus, Cursor cursor);
public void run(
Parameters params,

View File

@ -20,12 +20,17 @@
package io.druid.query.topn;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.query.Result;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.topn.types.TopNStrategyFactory;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionHandlerUtils;
public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
{
private static final TopNStrategyFactory STRATEGY_FACTORY = new TopNStrategyFactory();
private final TopNQuery query;
private final TopNAlgorithm topNAlgorithm;
@ -42,16 +47,19 @@ public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
@SuppressWarnings("unchecked")
public Result<TopNResultValue> apply(Cursor cursor)
{
final DimensionSelector dimSelector = cursor.makeDimensionSelector(
query.getDimensionSpec()
final ColumnSelectorPlus[] selectorPlusArray = DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
Lists.newArrayList(query.getDimensionSpec()),
cursor
);
if (dimSelector == null) {
if (selectorPlusArray[0].getSelector() == null) {
return null;
}
TopNParams params = null;
try {
params = topNAlgorithm.makeInitParams(dimSelector, cursor);
params = topNAlgorithm.makeInitParams(selectorPlusArray[0], cursor);
TopNResultBuilder resultBuilder = BaseTopNAlgorithm.makeResultBuilder(params, query);

View File

@ -19,6 +19,8 @@
package io.druid.query.topn;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.topn.types.TopNColumnSelectorStrategy;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
@ -26,20 +28,20 @@ import io.druid.segment.DimensionSelector;
*/
public class TopNParams
{
private final DimensionSelector dimSelector;
private final Cursor cursor;
private final int cardinality;
private final int numValuesPerPass;
private final ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus;
protected TopNParams(
DimensionSelector dimSelector,
ColumnSelectorPlus<TopNColumnSelectorStrategy> selectorPlus,
Cursor cursor,
int numValuesPerPass
)
{
this.dimSelector = dimSelector;
this.selectorPlus = selectorPlus;
this.cursor = cursor;
this.cardinality = dimSelector.getValueCardinality();
this.cardinality = selectorPlus.getColumnSelectorStrategy().getCardinality(selectorPlus.getSelector());
this.numValuesPerPass = numValuesPerPass;
if (cardinality < 0) {
@ -47,9 +49,16 @@ public class TopNParams
}
}
// Only used by TopN algorithms that support String exclusively
// Otherwise, get an appropriately typed selector from getSelectorPlus()
public DimensionSelector getDimSelector()
{
return dimSelector;
return (DimensionSelector) selectorPlus.getSelector();
}
public ColumnSelectorPlus<TopNColumnSelectorStrategy> getSelectorPlus()
{
return selectorPlus;
}
public Cursor getCursor()

View File

@ -94,7 +94,6 @@ public class TopNQueryEngine
{
final Capabilities capabilities = adapter.getCapabilities();
final String dimension = query.getDimensionSpec().getDimension();
final int cardinality = adapter.getDimensionCardinality(dimension);
int numBytesPerRecord = 0;

View File

@ -0,0 +1,69 @@
package io.druid.query.topn.types;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.topn.BaseTopNAlgorithm;
import io.druid.query.topn.TopNParams;
import io.druid.query.topn.TopNQuery;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.util.Map;
public class StringTopNColumnSelectorStrategy implements TopNColumnSelectorStrategy<DimensionSelector>
{
@Override
public int getCardinality(DimensionSelector selector)
{
return selector.getValueCardinality();
}
@Override
public Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, Capabilities capabilities)
{
// This method is used for the DimExtractionTopNAlgorithm only.
// Unlike regular topN we cannot rely on ordering to optimize.
// Optimization possibly requires a reverse lookup from value to ID, which is
// not possible when applying an extraction function
final BaseTopNAlgorithm.AggregatorArrayProvider provider = new BaseTopNAlgorithm.AggregatorArrayProvider(
(DimensionSelector) params.getSelectorPlus().getSelector(),
query,
params.getCardinality(),
capabilities
);
return provider.build();
}
@Override
public void dimExtractionScanAndAggregate(
final TopNQuery query,
DimensionSelector selector,
Cursor cursor,
Aggregator[][] rowSelector,
Map<Comparable, Aggregator[]> aggregatesStore
)
{
final IndexedInts dimValues = selector.getRow();
for (int i = 0; i < dimValues.size(); ++i) {
final int dimIndex = dimValues.get(i);
Aggregator[] theAggregators = rowSelector[dimIndex];
if (theAggregators == null) {
final String key = selector.lookupName(dimIndex);
theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = BaseTopNAlgorithm.makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
rowSelector[dimIndex] = theAggregators;
}
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn.types;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.query.topn.TopNParams;
import io.druid.query.topn.TopNQuery;
import io.druid.segment.Capabilities;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.Cursor;
import java.util.Map;
public interface TopNColumnSelectorStrategy<ValueSelectorType extends ColumnValueSelector> extends ColumnSelectorStrategy
{
int getCardinality(ValueSelectorType selector);
/**
* Used by DimExtractionTopNAlgorithm.
*
* Create an Aggregator[][] using BaseTopNAlgorithm.AggregatorArrayProvider and the given parameters.
*
* As the Aggregator[][] is used as an integer-based lookup, this method is only applicable for dimension types
* that use integer row values.
*
* A dimension type that does not have integer values should return null.
*
* @param query The TopN query being served
* @param params Parameters for the TopN query being served
* @param capabilities Object indicating if dimension values are sorted
* @return an Aggregator[][] for integer-valued dimensions, null otherwise
*/
Aggregator[][] getDimExtractionRowSelector(TopNQuery query, TopNParams params, Capabilities capabilities);
/**
* Used by DimExtractionTopNAlgorithm.
*
* Read the current row from a dimension value selector, and for each row value:
* 1. Retrieve the Aggregator[] for the row value from rowSelector (fast integer lookup) or from
* aggregatesStore (slower map).
*
* 2. If the rowSelector and/or aggregatesStore did not have an entry for a particular row value,
* this function should retrieve the current Aggregator[] using BaseTopNAlgorithm.makeAggregators() and the
* provided cursor and query, storing them in rowSelector and aggregatesStore
*
* 3. Call aggregate() on each of the aggregators.
*
* If a dimension type doesn't have integer values, it should ignore rowSelector and use the aggregatesStore map only.
*
* @param query The TopN query being served.
* @param selector Dimension value selector
* @param cursor Cursor for the segment being queried
* @param rowSelector Integer lookup containing aggregators
* @param aggregatesStore Map containing aggregators
*/
void dimExtractionScanAndAggregate(
final TopNQuery query,
ValueSelectorType selector,
Cursor cursor,
Aggregator[][] rowSelector,
Map<Comparable, Aggregator[]> aggregatesStore
);
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.topn.types;
import io.druid.java.util.common.IAE;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
public class TopNStrategyFactory implements ColumnSelectorStrategyFactory<TopNColumnSelectorStrategy>
{
@Override
public TopNColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities
)
{
ValueType type = capabilities.getType();
switch(type) {
case STRING:
return new StringTopNColumnSelectorStrategy();
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
/**
* Base type for interfaces that manage column value selection, e.g. DimensionSelector, LongColumnSelector
*/
public interface ColumnValueSelector
{
}

View File

@ -26,6 +26,7 @@ import io.druid.segment.data.Indexed;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
/**
* Processing related interface
@ -61,7 +62,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
*
* @return Dimension name
*/
public String getDimensionName();
String getDimensionName();
/**
@ -70,7 +71,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
*
* @return A new DimensionIndexer object.
*/
public DimensionIndexer<EncodedType, EncodedTypeArray, ActualType> makeIndexer();
DimensionIndexer<EncodedType, EncodedTypeArray, ActualType> makeIndexer();
/**
@ -87,13 +88,13 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @return A new DimensionMergerV9 object.
*/
public DimensionMergerV9<EncodedTypeArray> makeMerger(
DimensionMergerV9<EncodedTypeArray> makeMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
);
) throws IOException;
/**
@ -110,14 +111,13 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @return A new DimensionMergerLegacy object.
*/
public DimensionMergerLegacy<EncodedTypeArray> makeLegacyMerger(
DimensionMergerLegacy<EncodedTypeArray> makeLegacyMerger(
IndexSpec indexSpec,
File outDir,
IOPeon ioPeon,
ColumnCapabilities capabilities,
ProgressIndicator progress
);
) throws IOException;
/**
* Given an array representing a single set of row value(s) for this dimension as an Object,
@ -128,7 +128,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @param dimVals Array of row values
* @return Size of dimVals
*/
public int getLengthFromEncodedArray(EncodedTypeArray dimVals);
int getLengthFromEncodedArray(EncodedTypeArray dimVals);
/**
@ -143,7 +143,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
*
* @return integer indicating comparison result of arrays
*/
public int compareSortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);
int compareSortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);
/**
@ -164,7 +164,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
*
* @return integer indicating comparison result of arrays
*/
public void validateSortedEncodedArrays(
void validateSortedEncodedArrays(
EncodedTypeArray lhs,
EncodedTypeArray rhs,
Indexed<ActualType> lhsEncodings,
@ -182,7 +182,7 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @param column Column for this dimension from a QueryableIndex
* @return The type-specific column subobject for this dimension.
*/
public Closeable getSubColumn(Column column);
Closeable getSubColumn(Column column);
/**
@ -196,5 +196,5 @@ public interface DimensionHandler<EncodedType extends Comparable<EncodedType>, E
* @param currRow The index of the row to retrieve
* @return The row from "column" specified by "currRow", as an array of values
*/
public Object getRowValueArrayFromColumn(Closeable column, int currRow);
Object getRowValueArrayFromColumn(Closeable column, int currRow);
}

View File

@ -1,51 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.java.util.common.IAE;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;
public final class DimensionHandlerUtil
{
private DimensionHandlerUtil() {}
public static DimensionHandler getHandlerFromCapabilities(
String dimensionName,
ColumnCapabilities capabilities,
MultiValueHandling multiValueHandling
)
{
DimensionHandler handler = null;
if (capabilities.getType() == ValueType.STRING) {
if (!capabilities.isDictionaryEncoded() || !capabilities.hasBitmapIndexes()) {
throw new IAE("String column must have dictionary encoding and bitmap index.");
}
// use default behavior
multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
handler = new StringDimensionHandler(dimensionName, multiValueHandling);
}
if (handler == null) {
throw new IAE("Could not create handler from invalid column type: " + capabilities.getType());
}
return handler;
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment;
import io.druid.java.util.common.IAE;
import io.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
import java.util.List;
public final class DimensionHandlerUtils
{
private DimensionHandlerUtils() {}
public final static ColumnCapabilities DEFAULT_STRING_CAPABILITIES =
new ColumnCapabilitiesImpl().setType(ValueType.STRING)
.setDictionaryEncoded(true)
.setHasBitmapIndexes(true);
public static DimensionHandler getHandlerFromCapabilities(
String dimensionName,
ColumnCapabilities capabilities,
MultiValueHandling multiValueHandling
)
{
if (capabilities == null) {
return new StringDimensionHandler(dimensionName, multiValueHandling);
}
multiValueHandling = multiValueHandling == null ? MultiValueHandling.ofDefault() : multiValueHandling;
if (capabilities.getType() == ValueType.STRING) {
if (!capabilities.isDictionaryEncoded() || !capabilities.hasBitmapIndexes()) {
throw new IAE("String column must have dictionary encoding and bitmap index.");
}
return new StringDimensionHandler(dimensionName, multiValueHandling);
}
// Return a StringDimensionHandler by default (null columns will be treated as String typed)
return new StringDimensionHandler(dimensionName, multiValueHandling);
}
/**
* Creates an array of ColumnSelectorPlus objects, selectors that handle type-specific operations within
* query processing engines, using a strategy factory provided by the query engine. One ColumnSelectorPlus
* will be created for each column specified in dimensionSpecs.
*
* The ColumnSelectorPlus provides access to a type strategy (e.g., how to group on a float column)
* and a value selector for a single column.
*
* A caller should define a strategy factory that provides an interface for type-specific operations
* in a query engine. See GroupByStrategyFactory for a reference.
*
* @param <ColumnSelectorStrategyClass> The strategy type created by the provided strategy factory.
* @param strategyFactory A factory provided by query engines that generates type-handling strategies
* @param dimensionSpecs The set of columns to generate ColumnSelectorPlus objects for
* @param cursor Used to create value selectors for columns.
* @return An array of ColumnSelectorPlus objects, in the order of the columns specified in dimensionSpecs
*/
public static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorPlus<ColumnSelectorStrategyClass>[] createColumnSelectorPluses(
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
List<DimensionSpec> dimensionSpecs,
ColumnSelectorFactory cursor
)
{
int dimCount = dimensionSpecs.size();
ColumnSelectorPlus<ColumnSelectorStrategyClass>[] dims = new ColumnSelectorPlus[dimCount];
for (int i = 0; i < dimCount; i++) {
final DimensionSpec dimSpec = dimensionSpecs.get(i);
final String dimName = dimSpec.getDimension();
ColumnSelectorStrategyClass strategy = makeStrategy(
strategyFactory,
dimName,
cursor.getColumnCapabilities(dimSpec.getDimension())
);
final ColumnValueSelector selector = getColumnValueSelectorFromDimensionSpec(
dimSpec,
cursor
);
final ColumnSelectorPlus<ColumnSelectorStrategyClass> selectorPlus = new ColumnSelectorPlus<>(
dimName,
dimSpec.getOutputName(),
strategy,
selector
);
dims[i] = selectorPlus;
}
return dims;
}
// When determining the capabilites of a column during query processing, this function
// adjusts the capabilities for columns that cannot be handled as-is to manageable defaults
// (e.g., treating missing columns as empty String columns)
private static ColumnCapabilities getEffectiveCapabilities(
String dimName,
ColumnCapabilities capabilities
)
{
if (capabilities == null) {
capabilities = DEFAULT_STRING_CAPABILITIES;
}
// non-Strings aren't actually supported yet
if (capabilities.getType() != ValueType.STRING) {
capabilities = DEFAULT_STRING_CAPABILITIES;
}
return capabilities;
}
private static ColumnValueSelector getColumnValueSelectorFromDimensionSpec(
DimensionSpec dimSpec,
ColumnSelectorFactory columnSelectorFactory
)
{
String dimName = dimSpec.getDimension();
ColumnCapabilities capabilities = columnSelectorFactory.getColumnCapabilities(dimName);
capabilities = getEffectiveCapabilities(dimName, capabilities);
switch (capabilities.getType()) {
case STRING:
return columnSelectorFactory.makeDimensionSelector(dimSpec);
default:
return null;
}
}
private static <ColumnSelectorStrategyClass extends ColumnSelectorStrategy> ColumnSelectorStrategyClass makeStrategy(
ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass> strategyFactory,
String dimName,
ColumnCapabilities capabilities
)
{
capabilities = getEffectiveCapabilities(dimName, capabilities);
return strategyFactory.makeColumnSelectorStrategy(capabilities);
}
}

View File

@ -119,7 +119,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
*
* @return An array containing an encoded representation of the input row value.
*/
public EncodedTypeArray processRowValsToUnsortedEncodedArray(Object dimValues);
EncodedTypeArray processRowValsToUnsortedEncodedArray(Object dimValues);
/**
@ -132,7 +132,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param unsortedIntermediateValue value to convert
* @return converted value
*/
public EncodedType getSortedEncodedValueFromUnsorted(EncodedType unsortedIntermediateValue);
EncodedType getSortedEncodedValueFromUnsorted(EncodedType unsortedIntermediateValue);
/**
@ -145,7 +145,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param sortedIntermediateValue value to convert
* @return converted value
*/
public EncodedType getUnsortedEncodedValueFromSorted(EncodedType sortedIntermediateValue);
EncodedType getUnsortedEncodedValueFromSorted(EncodedType sortedIntermediateValue);
/**
@ -159,7 +159,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
*
* @return Sorted index of actual values
*/
public Indexed<ActualType> getSortedIndexedValues();
Indexed<ActualType> getSortedIndexedValues();
/**
@ -177,7 +177,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
*
* @return min value
*/
public ActualType getMinValue();
ActualType getMinValue();
/**
@ -185,7 +185,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
*
* @return max value
*/
public ActualType getMaxValue();
ActualType getMaxValue();
/**
@ -193,7 +193,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
*
* @return value cardinality
*/
public int getCardinality();
int getCardinality();
/**
@ -210,7 +210,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param desc Descriptor object for this dimension within an IncrementalIndex
* @return A new object that reads rows from currEntry
*/
public Object makeColumnValueSelector(
Object makeColumnValueSelector(
DimensionSpec spec,
IncrementalIndexStorageAdapter.EntryHolder currEntry,
IncrementalIndex.DimensionDesc desc
@ -239,7 +239,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param rhs dimension value array from a TimeAndDims key
* @return comparison of the two arrays
*/
public int compareUnsortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);
int compareUnsortedEncodedArrays(EncodedTypeArray lhs, EncodedTypeArray rhs);
/**
@ -249,7 +249,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param rhs dimension value array from a TimeAndDims key
* @return true if the two arrays are equal
*/
public boolean checkUnsortedEncodedArraysEqual(EncodedTypeArray lhs, EncodedTypeArray rhs);
boolean checkUnsortedEncodedArraysEqual(EncodedTypeArray lhs, EncodedTypeArray rhs);
/**
@ -257,10 +257,10 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param key dimension value array from a TimeAndDims key
* @return hashcode of the array
*/
public int getUnsortedEncodedArrayHashCode(EncodedTypeArray key);
int getUnsortedEncodedArrayHashCode(EncodedTypeArray key);
public static final boolean LIST = true;
public static final boolean ARRAY = false;
boolean LIST = true;
boolean ARRAY = false;
/**
* Given a row value array from a TimeAndDims key, as described in the documentation for compareUnsortedEncodedArrays(),
@ -273,7 +273,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param asList if true, return an array; if false, return a list
* @return single value, array, or list containing the actual values corresponding to the encoded values in the input array
*/
public Object convertUnsortedEncodedArrayToActualArrayOrList(EncodedTypeArray key, boolean asList);
Object convertUnsortedEncodedArrayToActualArrayOrList(EncodedTypeArray key, boolean asList);
/**
@ -283,7 +283,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param key dimension value array from a TimeAndDims key
* @return array containing the sorted encoded values corresponding to the unsorted encoded values in the input array
*/
public EncodedTypeArray convertUnsortedEncodedArrayToSortedEncodedArray(EncodedTypeArray key);
EncodedTypeArray convertUnsortedEncodedArrayToSortedEncodedArray(EncodedTypeArray key);
/**
@ -307,7 +307,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param bitmapIndexes array of bitmaps, indexed by integer dimension value
* @param factory bitmap factory
*/
public void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory);
void fillBitmapsFromUnsortedEncodedArray(EncodedTypeArray key, int rowNum, MutableBitmap[] bitmapIndexes, BitmapFactory factory);
/**
@ -326,8 +326,7 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param dimIndex the array index of this indexer's dimension within the TimeAndDims key
* @return A ValueMatcher that matches a dimension value array from a TimeAndDims key against "matchValue"
*/
public ValueMatcher makeIndexingValueMatcher(String matchValue, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex);
ValueMatcher makeIndexingValueMatcher(String matchValue, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex);
/**
* Return a ValueMatcher that accepts an EntryHolder containing the current TimeAndDims key and the array index of this
@ -350,5 +349,5 @@ public interface DimensionIndexer<EncodedType extends Comparable<EncodedType>, E
* @param dimIndex the array index of this indexer's dimension within the TimeAndDims key
* @return A ValueMatcher that applies a predicate from the predicateFactory to the dimension values in the TimeAndDim keys
*/
public ValueMatcher makeIndexingValueMatcher(DruidPredicateFactory predicateFactory, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex);
ValueMatcher makeIndexingValueMatcher(DruidPredicateFactory predicateFactory, IncrementalIndexStorageAdapter.EntryHolder holder, int dimIndex);
}

View File

@ -68,7 +68,7 @@ public interface DimensionMerger<EncodedTypedArray>
* @param adapters List of adapters to be merged.
* @throws IOException
*/
public void writeMergedValueMetadata(List<IndexableAdapter> adapters) throws IOException;
void writeMergedValueMetadata(List<IndexableAdapter> adapters) throws IOException;
/**
@ -86,7 +86,7 @@ public interface DimensionMerger<EncodedTypedArray>
* @param segmentRow A row from a segment to be converted to its representation within the merged sequence of rows.
* @param segmentIndexNumber Integer indicating which segment the row originated from.
*/
public EncodedTypedArray convertSegmentRowValuesToMergedRowValues(EncodedTypedArray segmentRow, int segmentIndexNumber);
EncodedTypedArray convertSegmentRowValuesToMergedRowValues(EncodedTypedArray segmentRow, int segmentIndexNumber);
/**
@ -101,7 +101,7 @@ public interface DimensionMerger<EncodedTypedArray>
* @param rowValues The row values to be added.
* @throws IOException
*/
public void processMergedRow(EncodedTypedArray rowValues) throws IOException;
void processMergedRow(EncodedTypedArray rowValues) throws IOException;
/**
@ -125,7 +125,7 @@ public interface DimensionMerger<EncodedTypedArray>
* @param closer Add Closeables for resource cleanup to this Closer if needed
* @throws IOException
*/
public void writeIndexes(List<IntBuffer> segmentRowNumConversions, Closer closer) throws IOException;
void writeIndexes(List<IntBuffer> segmentRowNumConversions, Closer closer) throws IOException;
/**
@ -135,5 +135,5 @@ public interface DimensionMerger<EncodedTypedArray>
*
* @return true if this dimension can be excluded from the merged segment.
*/
public boolean canSkip();
boolean canSkip();
}

View File

@ -23,6 +23,7 @@ import com.google.common.io.ByteSink;
import com.google.common.io.OutputSupplier;
import io.druid.common.guava.FileOutputSupplier;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -41,7 +42,7 @@ public interface DimensionMergerLegacy<EncodedTypeArray> extends DimensionMerger
* @param valueEncodingFile Destination file
* @throws IOException
*/
public void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException;
void writeValueMetadataToFile(FileOutputSupplier valueEncodingFile) throws IOException;
/**
@ -49,7 +50,7 @@ public interface DimensionMergerLegacy<EncodedTypeArray> extends DimensionMerger
* @param rowValueOut Destination file
* @throws IOException
*/
public void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException;
void writeRowValuesToFile(FileOutputSupplier rowValueOut) throws IOException;
/**
@ -58,8 +59,11 @@ public interface DimensionMergerLegacy<EncodedTypeArray> extends DimensionMerger
* @param spatialOut Destination file for spatial indexes
* @throws IOException
*/
public void writeIndexesToFiles(
void writeIndexesToFiles(
ByteSink invertedOut,
OutputSupplier<FileOutputStream> spatialOut
) throws IOException;
File makeDimFile() throws IOException;
}

View File

@ -21,6 +21,8 @@ package io.druid.segment;
import io.druid.segment.column.ColumnDescriptor;
import java.io.IOException;
/**
* Processing related interface
*
@ -34,5 +36,5 @@ public interface DimensionMergerV9<EncodedTypeArray> extends DimensionMerger<Enc
*
* @return ColumnDescriptor that IndexMergerV9 will use to build a column.
*/
public ColumnDescriptor makeColumnDescriptor();
ColumnDescriptor makeColumnDescriptor() throws IOException;
}

View File

@ -21,7 +21,7 @@ package io.druid.segment;import io.druid.segment.data.IndexedInts;
/**
*/
public interface DimensionSelector
public interface DimensionSelector extends ColumnValueSelector
{
public static int CARDINALITY_UNKNOWN = -1;

View File

@ -24,7 +24,7 @@ package io.druid.segment;
* FloatColumnSelector has a handle onto some other stateful object (e.g. an Offset) which is changing between calls
* to get() (though, that doesn't have to be the case if you always want the same value...).
*/
public interface FloatColumnSelector
public interface FloatColumnSelector extends ColumnValueSelector
{
public float get();
}

View File

@ -691,7 +691,7 @@ public class IndexMerger
mergers.add(merger);
merger.writeMergedValueMetadata(indexes);
FileOutputSupplier dimOut = new FileOutputSupplier(IndexIO.makeDimFile(v8OutDir, mergedDimensions.get(i)), true);
FileOutputSupplier dimOut = new FileOutputSupplier(merger.makeDimFile(), true);
merger.writeValueMetadataToFile(dimOut);
dimOuts.add(dimOut);
}
@ -912,7 +912,7 @@ public class IndexMerger
for (int i = 0; i < mergedDimensions.size(); i++) {
ColumnCapabilities capabilities = dimCapabilities.get(i);
String dimName = mergedDimensions.get(i);
handlers[i] = DimensionHandlerUtil.getHandlerFromCapabilities(dimName, capabilities, null);
handlers[i] = DimensionHandlerUtils.getHandlerFromCapabilities(dimName, capabilities, null);
}
return handlers;
}

View File

@ -21,7 +21,7 @@ package io.druid.segment;
/**
*/
public interface LongColumnSelector
public interface LongColumnSelector extends ColumnValueSelector
{
public long get();
}

View File

@ -19,7 +19,7 @@
package io.druid.segment;
public interface ObjectColumnSelector<T>
public interface ObjectColumnSelector<T> extends ColumnValueSelector
{
public Class<T> classOfObject();
public T get();

View File

@ -20,9 +20,7 @@
package io.druid.segment;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -36,6 +34,7 @@ import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.math.expr.Expr;
import io.druid.math.expr.Parser;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.QueryInterruptedException;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
@ -46,6 +45,8 @@ import io.druid.query.filter.DruidPredicateFactory;
import io.druid.query.filter.Filter;
import io.druid.query.filter.RowOffsetMatcherFactory;
import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategy;
import io.druid.query.filter.ValueMatcherColumnSelectorStrategyFactory;
import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
@ -303,7 +304,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return Sequences.filter(
new CursorSequenceBuilder(
index,
this,
actualInterval,
virtualColumns,
gran,
@ -329,7 +330,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private static class CursorSequenceBuilder
{
private final ColumnSelector index;
private final StorageAdapter storageAdapter;
private final QueryableIndex index;
private final Interval interval;
private final VirtualColumns virtualColumns;
private final QueryGranularity gran;
@ -341,7 +343,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private final ColumnSelectorBitmapIndexSelector bitmapIndexSelector;
public CursorSequenceBuilder(
ColumnSelector index,
QueryableIndexStorageAdapter storageAdapter,
Interval interval,
VirtualColumns virtualColumns,
QueryGranularity gran,
@ -353,7 +355,8 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
ColumnSelectorBitmapIndexSelector bitmapIndexSelector
)
{
this.index = index;
this.storageAdapter = storageAdapter;
this.index = storageAdapter.index;
this.interval = interval;
this.virtualColumns = virtualColumns;
this.gran = gran;
@ -925,7 +928,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
return new QueryableIndexBaseCursor()
{
CursorOffsetHolderValueMatcherFactory valueMatcherFactory = new CursorOffsetHolderValueMatcherFactory(
index,
storageAdapter,
this
);
RowOffsetMatcherFactory rowOffsetMatcherFactory = new CursorOffsetHolderRowOffsetMatcherFactory(
@ -1039,98 +1042,64 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private static class CursorOffsetHolderValueMatcherFactory implements ValueMatcherFactory
{
private final ColumnSelector index;
private static final ValueMatcherColumnSelectorStrategyFactory STRATEGY_FACTORY =
new ValueMatcherColumnSelectorStrategyFactory();
private final StorageAdapter storageAdapter;
private final ColumnSelectorFactory cursor;
private final List<String> availableMetrics;
public CursorOffsetHolderValueMatcherFactory(
ColumnSelector index,
StorageAdapter storageAdapter,
ColumnSelectorFactory cursor
)
{
this.index = index;
this.storageAdapter = storageAdapter;
this.cursor = cursor;
this.availableMetrics = Lists.newArrayList(storageAdapter.getAvailableMetrics());
}
@Override
public ValueMatcher makeValueMatcher(String dimension, final String value)
{
if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) {
if (getTypeForDimension(dimension) == ValueType.LONG) {
return Filters.getLongValueMatcher(
cursor.makeLongColumnSelector(dimension),
value
);
}
}
final DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(dimension, dimension)
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
cursor
);
// if matching against null, rows with size 0 should also match
final boolean matchNull = Strings.isNullOrEmpty(value);
final int id = selector.lookupId(value);
if (id < 0) {
return new BooleanValueMatcher(false);
} else {
return new ValueMatcher()
{
@Override
public boolean matches()
{
IndexedInts row = selector.getRow();
if (row.size() == 0) {
return matchNull;
}
for (int i = 0; i < row.size(); i++) {
if (row.get(i) == id) {
return true;
}
}
return false;
}
};
}
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
return strategy.getValueMatcher(dimension, cursor, value);
}
@Override
public ValueMatcher makeValueMatcher(String dimension, final DruidPredicateFactory predicateFactory)
{
ValueType type = getTypeForDimension(dimension);
switch (type) {
case LONG:
if (dimension.equals(Column.TIME_COLUMN_NAME) || availableMetrics.contains(dimension)) {
if (getTypeForDimension(dimension) == ValueType.LONG) {
return makeLongValueMatcher(dimension, predicateFactory.makeLongPredicate());
case STRING:
return makeStringValueMatcher(dimension, predicateFactory.makeStringPredicate());
default:
return new BooleanValueMatcher(predicateFactory.makeStringPredicate().apply(null));
}
}
private ValueMatcher makeStringValueMatcher(String dimension, final Predicate<String> predicate)
{
final DimensionSelector selector = cursor.makeDimensionSelector(
new DefaultDimensionSpec(dimension, dimension)
ColumnSelectorPlus<ValueMatcherColumnSelectorStrategy>[] selector =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
ImmutableList.<DimensionSpec>of(DefaultDimensionSpec.of(dimension)),
cursor
);
return new ValueMatcher()
{
final boolean matchNull = predicate.apply(null);
@Override
public boolean matches()
{
IndexedInts row = selector.getRow();
if (row.size() == 0) {
return matchNull;
}
for (int i = 0; i < row.size(); i++) {
if (predicate.apply(selector.lookupName(row.get(i)))) {
return true;
}
}
return false;
}
};
final ValueMatcherColumnSelectorStrategy strategy = selector[0].getColumnSelectorStrategy();
return strategy.getValueMatcher(dimension, cursor, predicateFactory);
}
private ValueMatcher makeLongValueMatcher(String dimension, final DruidLongPredicate predicate)
@ -1143,7 +1112,7 @@ public class QueryableIndexStorageAdapter implements StorageAdapter
private ValueType getTypeForDimension(String dimension)
{
ColumnCapabilities capabilities = getColumnCapabilites(index, dimension);
ColumnCapabilities capabilities = cursor.getColumnCapabilities(dimension);
return capabilities == null ? ValueType.STRING : capabilities.getType();
}
}

View File

@ -124,7 +124,7 @@ public class SimpleQueryableIndex implements QueryableIndex
{
for (String dim : availableDimensions) {
ColumnCapabilities capabilities = getColumn(dim).getCapabilities();
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities, null);
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null);
dimensionHandlers.put(dim, handler);
}
}

View File

@ -32,7 +32,6 @@ import java.io.Closeable;
import java.io.File;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Comparator;
public class StringDimensionHandler implements DimensionHandler<Integer, int[], String>
{
@ -214,20 +213,4 @@ public class StringDimensionHandler implements DimensionHandler<Integer, int[],
{
return new StringDimensionMergerLegacy(dimensionName, indexSpec, outDir, ioPeon, capabilities, progress);
}
public static final Comparator<Integer> ENCODED_COMPARATOR = new Comparator<Integer>()
{
@Override
public int compare(Integer o1, Integer o2)
{
if (o1 == null) {
return o2 == null ? 0 : -1;
}
if (o2 == null) {
return 1;
}
return o1.compareTo(o2);
}
};
}

View File

@ -213,4 +213,12 @@ public class StringDimensionMergerLegacy extends StringDimensionMergerV9 impleme
spatialIoPeon.cleanup();
}
}
@Override
public File makeDimFile() throws IOException
{
return IndexIO.makeDimFile(outDir, dimensionName);
}
}

View File

@ -49,7 +49,7 @@ import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionHandler;
import io.druid.segment.DimensionHandlerUtil;
import io.druid.segment.DimensionHandlerUtils;
import io.druid.segment.DimensionIndexer;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
@ -206,8 +206,8 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
// This ColumnSelectorFactory implementation has no knowledge of column capabilities.
// However, this method may still be called by FilteredAggregatorFactory's ValueMatcherFactory
// to check column types.
// Just return null, the caller will assume default types in that case.
return null;
// If column capabilities are not available, return null, the caller will assume default types in that case.
return columnCapabilities == null ? null : columnCapabilities.get(columnName);
}
@Override
@ -407,7 +407,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
if (dimSchema.getTypeName().equals(DimensionSchema.SPATIAL_TYPE_NAME)) {
capabilities.setHasSpatialIndexes(true);
} else {
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(
dimName,
capabilities,
dimSchema.getMultiValueHandling()
@ -567,7 +567,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
capabilities.setHasBitmapIndexes(true);
columnCapabilities.put(dimension, capabilities);
}
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dimension, capabilities, null);
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dimension, capabilities, null);
desc = addNewDimension(dimension, capabilities, handler);
}
DimensionHandler handler = desc.getHandler();
@ -747,7 +747,7 @@ public abstract class IncrementalIndex<AggregatorType> implements Iterable<Row>,
if (dimensionDescs.get(dim) == null) {
ColumnCapabilitiesImpl capabilities = oldColumnCapabilities.get(dim);
columnCapabilities.put(dim, capabilities);
DimensionHandler handler = DimensionHandlerUtil.getHandlerFromCapabilities(dim, capabilities, null);
DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null);
addNewDimension(dim, capabilities, handler);
}
}

View File

@ -139,6 +139,7 @@ public class MultiValuedDimensionTest
"2011-01-12T00:00:00.000Z,product_1,t1\tt2\tt3",
"2011-01-13T00:00:00.000Z,product_2,t3\tt4\tt5",
"2011-01-14T00:00:00.000Z,product_3,t5\tt6\tt7",
"2011-01-14T00:00:00.000Z,product_4"
};
for (String row : rows) {
@ -180,6 +181,7 @@ public class MultiValuedDimensionTest
);
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", null, "count", 2L),
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t1", "count", 2L),
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t2", "count", 2L),
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t3", "count", 4L),

View File

@ -28,6 +28,9 @@ import com.google.common.collect.DiscreteDomain;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.segment.DimensionSelector;
@ -41,6 +44,7 @@ public class CardinalityAggregatorBenchmark extends SimpleBenchmark
CardinalityBufferAggregator agg;
List<DimensionSelector> selectorList;
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfoList;
ByteBuffer buf;
int pos;
@ -75,16 +79,24 @@ public class CardinalityAggregatorBenchmark extends SimpleBenchmark
.cycle()
.limit(MAX);
final DimensionSpec dimSpec1 = new DefaultDimensionSpec("dim1", "dim1");
final CardinalityAggregatorTest.TestDimensionSelector dim1 =
new CardinalityAggregatorTest.TestDimensionSelector(values, null);
final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> dimInfo1 = new ColumnSelectorPlus(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(),
dim1
);
selectorList = Lists.newArrayList(
(DimensionSelector) dim1
);
dimInfoList = Lists.newArrayList(dimInfo1);
agg = new CardinalityBufferAggregator(
selectorList,
dimInfoList,
byRow
);

View File

@ -28,9 +28,12 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.js.JavaScriptConfig;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec;
@ -244,25 +247,44 @@ public class CardinalityAggregatorTest
}
}
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfoList;
List<DimensionSelector> selectorList;
CardinalityAggregatorFactory rowAggregatorFactory;
CardinalityAggregatorFactory valueAggregatorFactory;
final TestDimensionSelector dim1;
final TestDimensionSelector dim2;
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfoListWithExtraction;
List<DimensionSelector> selectorListWithExtraction;
final TestDimensionSelector dim1WithExtraction;
final TestDimensionSelector dim2WithExtraction;
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfoListConstantVal;
List<DimensionSelector> selectorListConstantVal;
final TestDimensionSelector dim1ConstantVal;
final TestDimensionSelector dim2ConstantVal;
final DimensionSpec dimSpec1 = new DefaultDimensionSpec("dim1", "dim1");
final DimensionSpec dimSpec2 = new DefaultDimensionSpec("dim2", "dim2");
public CardinalityAggregatorTest()
{
dim1 = new TestDimensionSelector(values1, null);
dim2 = new TestDimensionSelector(values2, null);
dimInfoList = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1
),
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec2.getDimension(),
dimSpec2.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2
)
);
selectorList = Lists.newArrayList(
(DimensionSelector) dim1,
dim2
@ -271,8 +293,8 @@ public class CardinalityAggregatorTest
rowAggregatorFactory = new CardinalityAggregatorFactory(
"billy",
Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("dim1", "dim1"),
new DefaultDimensionSpec("dim2", "dim2")
dimSpec1,
dimSpec2
),
true
);
@ -280,8 +302,8 @@ public class CardinalityAggregatorTest
valueAggregatorFactory = new CardinalityAggregatorFactory(
"billy",
Lists.<DimensionSpec>newArrayList(
new DefaultDimensionSpec("dim1", "dim1"),
new DefaultDimensionSpec("dim2", "dim2")
dimSpec1,
dimSpec2
),
false
);
@ -295,6 +317,18 @@ public class CardinalityAggregatorTest
(DimensionSelector) dim1WithExtraction,
dim2WithExtraction
);
dimInfoListWithExtraction = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1WithExtraction
),
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2WithExtraction
)
);
String helloJsFn = "function(str) { return 'hello' }";
ExtractionFn helloFn = new JavaScriptExtractionFn(helloJsFn, false, JavaScriptConfig.getDefault());
@ -304,13 +338,27 @@ public class CardinalityAggregatorTest
(DimensionSelector) dim1ConstantVal,
dim2ConstantVal
);
dimInfoListConstantVal = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1ConstantVal
),
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2ConstantVal
)
);
}
@Test
public void testAggregateRows() throws Exception
{
CardinalityAggregator agg = new CardinalityAggregator(
selectorList,
"billy",
dimInfoList,
true
);
@ -325,7 +373,8 @@ public class CardinalityAggregatorTest
public void testAggregateValues() throws Exception
{
CardinalityAggregator agg = new CardinalityAggregator(
selectorList,
"billy",
dimInfoList,
false
);
@ -339,7 +388,7 @@ public class CardinalityAggregatorTest
public void testBufferAggregateRows() throws Exception
{
CardinalityBufferAggregator agg = new CardinalityBufferAggregator(
selectorList,
dimInfoList,
true
);
@ -360,7 +409,7 @@ public class CardinalityAggregatorTest
public void testBufferAggregateValues() throws Exception
{
CardinalityBufferAggregator agg = new CardinalityBufferAggregator(
selectorList,
dimInfoList,
false
);
@ -382,9 +431,23 @@ public class CardinalityAggregatorTest
{
List<DimensionSelector> selector1 = Lists.newArrayList((DimensionSelector) dim1);
List<DimensionSelector> selector2 = Lists.newArrayList((DimensionSelector) dim2);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo1 = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1
)
);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo2 = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2
)
);
CardinalityAggregator agg1 = new CardinalityAggregator(selector1, true);
CardinalityAggregator agg2 = new CardinalityAggregator(selector2, true);
CardinalityAggregator agg1 = new CardinalityAggregator("billy", dimInfo1, true);
CardinalityAggregator agg2 = new CardinalityAggregator("billy", dimInfo2, true);
for (int i = 0; i < values1.size(); ++i) {
aggregate(selector1, agg1);
@ -414,8 +477,23 @@ public class CardinalityAggregatorTest
List<DimensionSelector> selector1 = Lists.newArrayList((DimensionSelector) dim1);
List<DimensionSelector> selector2 = Lists.newArrayList((DimensionSelector) dim2);
CardinalityAggregator agg1 = new CardinalityAggregator(selector1, false);
CardinalityAggregator agg2 = new CardinalityAggregator(selector2, false);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo1 = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim1
)
);
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfo2 = Lists.newArrayList(
new ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>(
dimSpec1.getDimension(),
dimSpec1.getOutputName(),
new StringCardinalityAggregatorColumnSelectorStrategy(), dim2
)
);
CardinalityAggregator agg1 = new CardinalityAggregator("billy", dimInfo1, false);
CardinalityAggregator agg2 = new CardinalityAggregator("billy", dimInfo2, false);
for (int i = 0; i < values1.size(); ++i) {
aggregate(selector1, agg1);
@ -443,7 +521,8 @@ public class CardinalityAggregatorTest
public void testAggregateRowsWithExtraction() throws Exception
{
CardinalityAggregator agg = new CardinalityAggregator(
selectorListWithExtraction,
"billy",
dimInfoListWithExtraction,
true
);
for (int i = 0; i < values1.size(); ++i) {
@ -452,7 +531,8 @@ public class CardinalityAggregatorTest
Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05);
CardinalityAggregator agg2 = new CardinalityAggregator(
selectorListConstantVal,
"billy",
dimInfoListConstantVal,
true
);
for (int i = 0; i < values1.size(); ++i) {
@ -465,7 +545,8 @@ public class CardinalityAggregatorTest
public void testAggregateValuesWithExtraction() throws Exception
{
CardinalityAggregator agg = new CardinalityAggregator(
selectorListWithExtraction,
"billy",
dimInfoListWithExtraction,
false
);
for (int i = 0; i < values1.size(); ++i) {
@ -474,7 +555,8 @@ public class CardinalityAggregatorTest
Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get()), 0.05);
CardinalityAggregator agg2 = new CardinalityAggregator(
selectorListConstantVal,
"billy",
dimInfoListConstantVal,
false
);
for (int i = 0; i < values1.size(); ++i) {

View File

@ -794,6 +794,7 @@ public class GroupByQueryRunnerTest
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
List<Row> res = Lists.newArrayList(results);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@ -6877,4 +6878,43 @@ public class GroupByQueryRunnerTest
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByCardinalityAggOnFloat()
{
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("market", "alias")))
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new CardinalityAggregatorFactory(
"numVals",
ImmutableList.<DimensionSpec>of(new DefaultDimensionSpec(
QueryRunnerTestHelper.indexMetric,
QueryRunnerTestHelper.indexMetric
)),
false
)
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
// CardinalityAggregator currently treats non-String columns as having all nulls, so cardinality is 1 for
// the 'index' column
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "spot", "rows", 9L, "numVals", 1.0002442201269182d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "total_market", "rows", 2L, "numVals", 1.0002442201269182d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "upfront", "rows", 2L, "numVals", 1.0002442201269182d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "spot", "rows", 9L, "numVals", 1.0002442201269182d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "total_market", "rows", 2L, "numVals", 1.0002442201269182d),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "upfront", "rows", 2L, "numVals", 1.0002442201269182d)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}

View File

@ -31,6 +31,7 @@ import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.ExtractionDimFilter;
@ -44,6 +45,7 @@ import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.search.search.SearchSortSpec;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.TestHelper;
import io.druid.segment.column.Column;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -602,6 +604,33 @@ public class SearchQueryRunnerTest
checkSearchQuery(searchQuery, expectedHits);
}
@Test
public void testSearchOnTime()
{
SearchQuery searchQuery = Druids.newSearchQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.query("Friday")
.dimensions(new ExtractionDimensionSpec(
Column.TIME_COLUMN_NAME,
"__time2",
new TimeFormatExtractionFn(
"EEEE",
null,
null,
null,
false
)
))
.build();
List<SearchHit> expectedHits = Lists.newLinkedList();
expectedHits.add(new SearchHit("__time2", "Friday", 169));
checkSearchQuery(searchQuery, expectedHits);
}
private void checkSearchQuery(Query searchQuery, List<SearchHit> expectedResults)
{