Merge pull request #1370 from metamx/dimExtractionOptimizations

Add optimizations for ExtractionFn by enabling MANY_TO_ONE vs ONE_TO_ONE codepaths
This commit is contained in:
Xavier Léauté 2015-06-02 13:25:19 -07:00
commit 224886ab52
35 changed files with 2574 additions and 380 deletions

View File

@ -146,6 +146,16 @@ Example for a regular dimension
}
```
```json
{
"type" : "javascript",
"function" : "function(str) { return str + '!!!'; }",
"injective" : true
}
```
A property of `injective` specifies if the javascript function preserves uniqueness. The default value is `false` meaning uniqueness is not preserved
Example for the `__time` dimension:
```json
@ -154,3 +164,40 @@ Example for the `__time` dimension:
"function" : "function(t) { return 'Second ' + Math.floor((t % 60000) / 1000); }"
}
```
### Explicit lookup extraction function
Explicit lookups allow you to specify a set of keys and values to use when performing the extraction
```json
{
"type":"lookup",
"lookup":{
"type":"map",
"map":{"foo":"bar", "baz":"bat"}
},
"retainMissingValue":true,
"injective":true
}
```
```json
{
"type":"lookup",
"lookup":{
"type":"map",
"map":{"foo":"bar", "baz":"bat"}
},
"retainMissingValue":false,
"injective":false,
"replaceMissingValueWith":"MISSING"
}
```
A property of `retainMissingValue` and `replaceMissingValueWith` can be specified at query time to hint how to handle missing values. Setting `replaceMissingValueWith` to `""` has the same effect of setting it to `null` or omitting the property. Setting `retainMissingValue` to true will use the dimension's original value if it is not found in the lookup. The default values are `replaceMissingValueWith = null` and `retainMissingValue = false` which causes missing values to be treated as missing.
It is illegal to set `retainMissingValue = true` and also specify a `replaceMissingValueWith`
A property of `injective` specifies if optimizations can be used which assume there is no combining of multiple names into one. For example: If ABC123 is the only key that maps to SomeCompany, that can be optimized since it is a unique lookup. But if both ABC123 and DEF456 BOTH map to SomeCompany, then that is NOT a unique lookup. Setting this value to true and setting `retainMissingValue` to FALSE (the default) may cause undesired behavior.
A null dimension value can be mapped to a specific value by specifying the empty string as the key.
This allows distinguishing between a null dimension and a lookup resulting in a null.
For example, specifying `{"":"bar","bat":"baz"}` with dimension values `[null, "foo", "bat"]` and replacing missing values with `"oof"` will yield results of `["bar", "oof", "baz"]`.
Omitting the empty string key will cause the missing value to take over. For example, specifying `{"bat":"baz"}` with dimension values `[null, "foo", "bat"]` and replacing missing values with `"oof"` will yield results of `["oof", "oof", "baz"]`.

View File

@ -60,7 +60,7 @@ public class ApproximateHistogramGroupByQueryTest
private GroupByQueryRunnerFactory factory;
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
final ObjectMapper mapper = new DefaultObjectMapper();
final StupidPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
@ -84,8 +84,10 @@ public class ApproximateHistogramGroupByQueryTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, mapper, engine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new GroupByQueryQueryToolChest(
configSupplier, mapper, engine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
pool
);
@ -106,18 +108,20 @@ public class ApproximateHistogramGroupByQueryTest
singleThreadEngine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
singleThreadedConfigSupplier,
new GroupByQueryQueryToolChest(singleThreadedConfigSupplier, mapper, singleThreadEngine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new GroupByQueryQueryToolChest(
singleThreadedConfigSupplier, mapper, singleThreadEngine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
pool
);
Function<Object, Object> function = new Function<Object, Object>()
final Function<Object, Object[]> function = new Function<Object, Object[]>()
{
@Override
public Object apply(@Nullable Object input)
public Object[] apply(@Nullable Object input)
{
return new Object[]{factory, ((Object[]) input)[0]};
return new Object[]{factory, input};
}
};

View File

@ -54,38 +54,35 @@ import java.util.Map;
public class ApproximateHistogramTopNQueryTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
List<Object> retVal = Lists.newArrayList();
retVal.addAll(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
return QueryRunnerTestHelper.transformToConstructionFeeder(
Iterables.concat(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
TestQueryRunners.getPool(),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
),
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(2000);
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
)
);
retVal.addAll(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(2000);
}
}
),
new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
return retVal;
}
private final QueryRunner runner;

View File

@ -29,7 +29,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(name = "partial", value = MatchingDimExtractionFn.class),
@JsonSubTypes.Type(name = "searchQuery", value = SearchQuerySpecDimExtractionFn.class),
@JsonSubTypes.Type(name = "javascript", value = JavascriptExtractionFn.class),
@JsonSubTypes.Type(name = "timeFormat", value = TimeFormatExtractionFn.class)
@JsonSubTypes.Type(name = "timeFormat", value = TimeFormatExtractionFn.class),
@JsonSubTypes.Type(name = "lookup", value = LookupExtractionFn.class)
})
/**
* An ExtractionFn is a function that can be used to transform the values of a column (typically a dimension)
@ -56,8 +57,8 @@ public interface ExtractionFn
* a contract on the method rather than enforced at a lower level in order to eliminate a global check
* for extraction functions that do not already need one.
*
*
* @param value the original value of the dimension
*
* @return a value that should be used instead of the original
*/
public String apply(Object value);
@ -68,11 +69,26 @@ public interface ExtractionFn
/**
* Offers information on whether the extraction will preserve the original ordering of the values.
*
* <p/>
* Some optimizations of queries is possible if ordering is preserved. Null values *do* count towards
* ordering.
*
* @return true if ordering is preserved, false otherwise
*/
public boolean preservesOrdering();
/**
* A dim extraction can be of one of two types, renaming or rebucketing. In the `ONE_TO_ONE` case, a unique values is
* modified into another unique value. In the `MANY_TO_ONE` case, there is no longer a 1:1 relation between old dimension
* value and new dimension value
*
* @return {@link io.druid.query.extraction.ExtractionFn.ExtractionType} declaring what kind of manipulation this
* function does
*/
public ExtractionType getExtractionType();
public static enum ExtractionType
{
MANY_TO_ONE, ONE_TO_ONE
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import javax.annotation.Nullable;
/**
* Functional extraction uses a function to find the new value.
* null values in the range can either retain the domain's value, or replace the null value with "replaceMissingValueWith"
*/
public abstract class FunctionalExtraction extends DimExtractionFn
{
private final boolean retainMissingValue;
private final String replaceMissingValueWith;
private final Function<String, String> extractionFunction;
private final ExtractionType extractionType;
/**
* The general constructor which handles most of the logic for extractions which can be expressed as a function of string-->string
*
* @param extractionFunction The function to call when doing the extraction. The function must be able to accept a null input.
* @param retainMissingValue Boolean value on if functions which result in `null` should use the original value or should be kept as `null`
* @param replaceMissingValueWith String value to replace missing values with (instead of `null`)
* @param injective If this function always has 1:1 renames and the domain has the same cardinality of the input, this should be true and enables optimized query paths.
*/
public FunctionalExtraction(
final Function<String, String> extractionFunction,
final boolean retainMissingValue,
final String replaceMissingValueWith,
final boolean injective
)
{
this.retainMissingValue = retainMissingValue;
this.replaceMissingValueWith = Strings.emptyToNull(replaceMissingValueWith);
Preconditions.checkArgument(
!(this.retainMissingValue && !Strings.isNullOrEmpty(this.replaceMissingValueWith)),
"Cannot specify a [replaceMissingValueWith] and set [retainMissingValue] to true"
);
// If missing values are to be retained, we have a slightly different code path
// This is intended to have the absolutely fastest code path possible and not have any extra logic in the function
if (this.retainMissingValue) {
this.extractionFunction = new Function<String, String>()
{
@Nullable
@Override
public String apply(@Nullable String dimValue)
{
final String retval = extractionFunction.apply(dimValue);
return Strings.isNullOrEmpty(retval) ? Strings.emptyToNull(dimValue) : retval;
}
};
} else {
this.extractionFunction = new Function<String, String>()
{
@Nullable
@Override
public String apply(@Nullable String dimValue)
{
final String retval = extractionFunction.apply(dimValue);
return Strings.isNullOrEmpty(retval) ? FunctionalExtraction.this.replaceMissingValueWith : retval;
}
};
}
this.extractionType = injective
? ExtractionType.ONE_TO_ONE
: ExtractionType.MANY_TO_ONE;
}
public boolean isRetainMissingValue()
{
return retainMissingValue;
}
public String getReplaceMissingValueWith()
{
return replaceMissingValueWith;
}
public boolean isInjective()
{
return ExtractionType.ONE_TO_ONE.equals(getExtractionType());
}
@Override
public String apply(String value)
{
return extractionFunction.apply(value);
}
@Override
public boolean preservesOrdering()
{
return false;
}
@Override
public ExtractionType getExtractionType()
{
return extractionType;
}
}

View File

@ -31,7 +31,8 @@ import java.nio.ByteBuffer;
public class JavascriptExtractionFn implements ExtractionFn
{
private static Function<Object, String> compile(String function) {
private static Function<Object, String> compile(String function)
{
final ContextFactory contextFactory = ContextFactory.getGlobal();
final Context context = contextFactory.enterContext();
context.setOptimizationLevel(9);
@ -62,16 +63,19 @@ public class JavascriptExtractionFn implements ExtractionFn
private final String function;
private final Function<Object, String> fn;
private final boolean injective;
@JsonCreator
public JavascriptExtractionFn(
@JsonProperty("function") String function
@JsonProperty("function") String function,
@JsonProperty("injective") boolean injective
)
{
Preconditions.checkNotNull(function, "function must not be null");
this.function = function;
this.fn = compile(function);
this.injective = injective;
}
@JsonProperty
@ -80,6 +84,12 @@ public class JavascriptExtractionFn implements ExtractionFn
return function;
}
@JsonProperty
public boolean isInjective()
{
return this.injective;
}
@Override
public byte[] getCacheKey()
{
@ -99,13 +109,13 @@ public class JavascriptExtractionFn implements ExtractionFn
@Override
public String apply(String value)
{
return this.apply((Object)value);
return this.apply((Object) value);
}
@Override
public String apply(long value)
{
return this.apply((Long)value);
return this.apply((Long) value);
}
@Override
@ -114,6 +124,12 @@ public class JavascriptExtractionFn implements ExtractionFn
return false;
}
@Override
public ExtractionType getExtractionType()
{
return injective ? ExtractionType.ONE_TO_ONE : ExtractionType.MANY_TO_ONE;
}
@Override
public String toString()
{

View File

@ -0,0 +1,111 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.metamx.common.StringUtils;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class LookupExtractionFn extends FunctionalExtraction
{
private static final byte CACHE_TYPE_ID = 0x5;
private final LookupExtractor lookup;
@JsonCreator
public LookupExtractionFn(
@JsonProperty("lookup")
final LookupExtractor lookup,
@JsonProperty("retainMissingValue")
final boolean retainMissingValue,
@Nullable
@JsonProperty("replaceMissingValueWith")
final String replaceMissingValueWith,
@JsonProperty("injective")
final boolean injective
)
{
super(
new Function<String, String>()
{
@Nullable
@Override
public String apply(String input)
{
return lookup.apply(Strings.nullToEmpty(input));
}
},
retainMissingValue,
replaceMissingValueWith,
injective
);
this.lookup = lookup;
}
@JsonProperty
public LookupExtractor getLookup()
{
return lookup;
}
@Override
@JsonProperty
public boolean isRetainMissingValue() {return super.isRetainMissingValue();}
@Override
@JsonProperty
public String getReplaceMissingValueWith() {return super.getReplaceMissingValueWith();}
@Override
@JsonProperty
public boolean isInjective()
{
return super.isInjective();
}
@Override
public byte[] getCacheKey()
{
try {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
outputStream.write(CACHE_TYPE_ID);
outputStream.write(lookup.getCacheKey());
if (getReplaceMissingValueWith() != null) {
outputStream.write(StringUtils.toUtf8(getReplaceMissingValueWith()));
}
outputStream.write(isInjective() ? 1 : 0);
outputStream.write(isRetainMissingValue() ? 1 : 0);
return outputStream.toByteArray();
}
catch (IOException ex) {
// If ByteArrayOutputStream.write has problems, that is a very bad thing
throw Throwables.propagate(ex);
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "map", value = MapLookupExtractor.class)
})
public interface LookupExtractor
{
/**
* Apply a particular lookup methodology to the input string
* @param key The value to apply the lookup to. May not be null
* @return The lookup, or null key cannot have the lookup applied to it and should be treated as missing.
*/
@Nullable String apply(@NotNull String key);
/**
* Create a cache key for use in results caching
* @return A byte array that can be used to uniquely identify if results of a prior lookup can use the cached values
*/
@NotNull byte[] getCacheKey();
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.StringUtils;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
@JsonTypeName("map")
public class MapLookupExtractor implements LookupExtractor
{
private final Map<String, String> map;
@JsonCreator
public MapLookupExtractor(
@JsonProperty("map") Map<String, String> map
)
{
this.map = Preconditions.checkNotNull(map, "map");
}
@JsonProperty
public Map<String, String> getMap()
{
return ImmutableMap.copyOf(map);
}
@Nullable
@Override
public String apply(@NotNull String val)
{
return map.get(val);
}
@Override
public byte[] getCacheKey()
{
try {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
for (Map.Entry<String, String> entry : map.entrySet()) {
final String key = entry.getKey();
final String val = entry.getValue();
if (!Strings.isNullOrEmpty(key)) {
outputStream.write(StringUtils.toUtf8(key));
}
outputStream.write((byte)0xFF);
if (!Strings.isNullOrEmpty(val)) {
outputStream.write(StringUtils.toUtf8(val));
}
outputStream.write((byte)0xFF);
}
return outputStream.toByteArray();
}
catch (IOException ex) {
// If ByteArrayOutputStream.write has problems, that is a very bad thing
throw Throwables.propagate(ex);
}
}
}

View File

@ -76,6 +76,12 @@ public class MatchingDimExtractionFn extends DimExtractionFn
return false;
}
@Override
public ExtractionType getExtractionType()
{
return ExtractionType.MANY_TO_ONE;
}
@Override
public String toString()
{

View File

@ -75,6 +75,12 @@ public class RegexDimExtractionFn extends DimExtractionFn
return false;
}
@Override
public ExtractionType getExtractionType()
{
return ExtractionType.MANY_TO_ONE;
}
@Override
public String toString()
{

View File

@ -70,6 +70,12 @@ public class SearchQuerySpecDimExtractionFn extends DimExtractionFn
return true;
}
@Override
public ExtractionType getExtractionType()
{
return ExtractionType.MANY_TO_ONE;
}
@Override
public String toString()
{

View File

@ -96,6 +96,12 @@ public class TimeDimExtractionFn extends DimExtractionFn
return false;
}
@Override
public ExtractionType getExtractionType()
{
return ExtractionType.MANY_TO_ONE;
}
@Override
public String toString()
{

View File

@ -111,6 +111,12 @@ public class TimeFormatExtractionFn implements ExtractionFn
return false;
}
@Override
public ExtractionType getExtractionType()
{
return ExtractionType.MANY_TO_ONE;
}
@Override
public boolean equals(Object o)
{

View File

@ -96,7 +96,7 @@ public class GroupByQuery extends BaseQuery<Row>
this.aggregatorSpecs = aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
this.havingSpec = havingSpec;
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
this.limitSpec = (limitSpec == null) ? new NoopLimitSpec() : limitSpec;
Preconditions.checkNotNull(this.granularity, "Must specify a granularity");
Preconditions.checkNotNull(this.aggregatorSpecs, "Must specify at least one aggregator");
@ -275,6 +275,23 @@ public class GroupByQuery extends BaseQuery<Row>
);
}
public GroupByQuery withDimensionSpecs(final List<DimensionSpec> dimensionSpecs)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
getDimFilter(),
getGranularity(),
dimensionSpecs,
getAggregatorSpecs(),
getPostAggregatorSpecs(),
getHavingSpec(),
getLimitSpec(),
limitFn,
getContext()
);
}
public static class Builder
{
private DataSource dataSource;

View File

@ -42,11 +42,9 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.filter.Filters;
import org.joda.time.DateTime;

View File

@ -20,9 +20,13 @@ package io.druid.query.groupby;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
@ -43,27 +47,34 @@ import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Global;
import io.druid.query.CacheStrategy;
import io.druid.query.DataSource;
import io.druid.query.DruidMetrics;
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryDataSource;
import io.druid.query.DruidMetrics;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SubqueryQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.DimFilter;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
@ -256,12 +267,12 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
public ServiceMetricEvent.Builder makeMetricBuilder(GroupByQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query)
.setDimension("numDimensions", String.valueOf(query.getDimensions().size()))
.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()))
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
.setDimension("numDimensions", String.valueOf(query.getDimensions().size()))
.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size()))
.setDimension(
"numComplexMetrics",
String.valueOf(DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()))
);
}
@Override
@ -288,6 +299,62 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
};
}
@Override
public Function<Row, Row> makePostComputeManipulatorFn(
final GroupByQuery query,
final MetricManipulationFn fn
)
{
final Set<String> optimizedDims = ImmutableSet.copyOf(
Iterables.transform(
extractionsToRewrite(query),
new Function<DimensionSpec, String>()
{
@Override
public String apply(DimensionSpec input)
{
return input.getOutputName();
}
}
)
);
final Function<Row, Row> preCompute = makePreComputeManipulatorFn(query, fn);
if (optimizedDims.isEmpty()) {
return preCompute;
}
// If we have optimizations that can be done at this level, we apply them here
final Map<String, ExtractionFn> extractionFnMap = new HashMap<>();
for (DimensionSpec dimensionSpec : query.getDimensions()) {
final String dimension = dimensionSpec.getOutputName();
if (optimizedDims.contains(dimension)) {
extractionFnMap.put(dimension, dimensionSpec.getExtractionFn());
}
}
return new Function<Row, Row>()
{
@Nullable
@Override
public Row apply(Row input)
{
Row preRow = preCompute.apply(input);
if (preRow instanceof MapBasedRow) {
MapBasedRow preMapRow = (MapBasedRow) preRow;
Map<String, Object> event = Maps.newHashMap(preMapRow.getEvent());
for (String dim : optimizedDims) {
final Object eventVal = event.get(dim);
event.put(dim, extractionFnMap.get(dim).apply(eventVal));
}
return new MapBasedRow(preMapRow.getTimestamp(), event);
} else {
return preRow;
}
}
};
}
@Override
public TypeReference<Row> getResultTypeReference()
{
@ -295,10 +362,49 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
@Override
public QueryRunner<Row> preMergeQueryDecoration(QueryRunner<Row> runner)
public QueryRunner<Row> preMergeQueryDecoration(final QueryRunner<Row> runner)
{
return new SubqueryQueryRunner<>(
intervalChunkingQueryRunnerDecorator.decorate(runner, this)
intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(Query<Row> query, Map<String, Object> responseContext)
{
if (!(query instanceof GroupByQuery)) {
return runner.run(query, responseContext);
}
GroupByQuery groupByQuery = (GroupByQuery) query;
ArrayList<DimensionSpec> dimensionSpecs = new ArrayList<>();
Set<String> optimizedDimensions = ImmutableSet.copyOf(
Iterables.transform(
extractionsToRewrite(groupByQuery),
new Function<DimensionSpec, String>()
{
@Override
public String apply(DimensionSpec input)
{
return input.getDimension();
}
}
)
);
for (DimensionSpec dimensionSpec : groupByQuery.getDimensions()) {
if (optimizedDimensions.contains(dimensionSpec.getDimension())) {
dimensionSpecs.add(
new DefaultDimensionSpec(dimensionSpec.getDimension(), dimensionSpec.getOutputName())
);
} else {
dimensionSpecs.add(dimensionSpec);
}
}
return runner.run(
groupByQuery.withDimensionSpecs(dimensionSpecs),
responseContext
);
}
}, this
)
);
}
@ -426,4 +532,30 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
};
}
/**
* This function checks the query for dimensions which can be optimized by applying the dimension extraction
* as the final step of the query instead of on every event.
*
* @param query The query to check for optimizations
*
* @return A collection of DimensionsSpec which can be extracted at the last second upon query completion.
*/
public static Collection<DimensionSpec> extractionsToRewrite(GroupByQuery query)
{
return Collections2.filter(
query.getDimensions(), new Predicate<DimensionSpec>()
{
@Override
public boolean apply(DimensionSpec input)
{
return input.getExtractionFn() != null
&& ExtractionFn.ExtractionType.ONE_TO_ONE.equals(
input.getExtractionFn().getExtractionType()
);
}
}
);
}
}

View File

@ -19,6 +19,7 @@ package io.druid.query.topn;
import com.google.common.base.Function;
import io.druid.query.Result;
import io.druid.query.extraction.ExtractionFn;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
@ -27,7 +28,6 @@ public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
private final TopNQuery query;
private final TopNAlgorithm topNAlgorithm;
public TopNMapFn(
TopNQuery query,
TopNAlgorithm topNAlgorithm

View File

@ -161,6 +161,35 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
);
}
public TopNQuery withDimensionSpec(DimensionSpec spec){
return new TopNQuery(
getDataSource(),
spec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
public TopNQuery withPostAggregatorSpecs(List<PostAggregator> postAggregatorSpecs){
return new TopNQuery(
getDataSource(),
getDimensionSpec(),
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
@Override
public Query<Result<TopNResultValue>> withDataSource(DataSource dataSource)
{

View File

@ -27,6 +27,7 @@ import io.druid.collections.StupidPool;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.Filter;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
@ -112,7 +113,7 @@ public class TopNQueryEngine
// A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(capabilities, query);
} else if(selector.isHasExtractionFn()) {
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new DimExtractionTopNAlgorithm(capabilities, query);
} else if (selector.isAggregateAllMetrics()) {
topNAlgorithm = new PooledTopNAlgorithm(capabilities, query, bufferPool);
@ -124,4 +125,12 @@ public class TopNQueryEngine
return new TopNMapFn(query, topNAlgorithm);
}
public static boolean canApplyExtractionInPost(TopNQuery query)
{
return query.getDimensionSpec() != null
&& query.getDimensionSpec().getExtractionFn() != null
&& ExtractionFn.ExtractionType.ONE_TO_ONE.equals(query.getDimensionSpec().getExtractionFn().getExtractionType())
&& query.getTopNMetricSpec().canBeOptimizedUnordered();
}
}

View File

@ -48,6 +48,9 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorUtil;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
@ -235,6 +238,9 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
final TopNQuery query, final MetricManipulationFn fn
)
{
final ExtractionFn extractionFn = TopNQueryEngine.canApplyExtractionInPost(query)
? query.getDimensionSpec().getExtractionFn()
: null;
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
private String dimension = query.getDimensionSpec().getOutputName();
@ -278,7 +284,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
values.put(name, fn.manipulate(aggregatorFactories[i], input.getMetric(name)));
}
values.put(dimension, input.getDimensionValue(dimension));
final Object dimValue = input.getDimensionValue(dimension);
values.put(dimension, extractionFn == null ? dimValue : extractionFn.apply(dimValue));
return values;
}
@ -423,15 +430,100 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(QueryRunner<Result<TopNResultValue>> runner)
public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(final QueryRunner<Result<TopNResultValue>> runner)
{
return intervalChunkingQueryRunnerDecorator.decorate(runner, this);
return intervalChunkingQueryRunnerDecorator.decorate(
new QueryRunner<Result<TopNResultValue>>()
{
@Override
public Sequence<Result<TopNResultValue>> run(
Query<Result<TopNResultValue>> query, Map<String, Object> responseContext
)
{
if (!(query instanceof TopNQuery)) {
return runner.run(query, responseContext);
} else {
final TopNQuery topNQuery = (TopNQuery) query;
if (TopNQueryEngine.canApplyExtractionInPost(topNQuery)) {
final DimensionSpec dimensionSpec = topNQuery.getDimensionSpec();
return runner.run(
topNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
)
), responseContext
);
} else {
return runner.run(query, responseContext);
}
}
}
}
, this
);
}
@Override
public QueryRunner<Result<TopNResultValue>> postMergeQueryDecoration(final QueryRunner<Result<TopNResultValue>> runner)
{
return new ThresholdAdjustingQueryRunner(runner, config.getMinTopNThreshold());
final ThresholdAdjustingQueryRunner thresholdRunner = new ThresholdAdjustingQueryRunner(
runner,
config.getMinTopNThreshold()
);
return new QueryRunner<Result<TopNResultValue>>()
{
@Override
public Sequence<Result<TopNResultValue>> run(
final Query<Result<TopNResultValue>> query, final Map<String, Object> responseContext
)
{
// thresholdRunner.run throws ISE if query is not TopNQuery
final Sequence<Result<TopNResultValue>> resultSequence = thresholdRunner.run(query, responseContext);
final TopNQuery topNQuery = (TopNQuery) query;
if (!TopNQueryEngine.canApplyExtractionInPost(topNQuery)) {
return resultSequence;
} else {
return Sequences.map(
resultSequence, new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
{
TopNResultValue resultValue = input.getValue();
return new Result<TopNResultValue>(
input.getTimestamp(),
new TopNResultValue(
Lists.transform(
resultValue.getValue(),
new Function<DimensionAndMetricValueExtractor, DimensionAndMetricValueExtractor>()
{
@Override
public DimensionAndMetricValueExtractor apply(
DimensionAndMetricValueExtractor input
)
{
String dimOutputName = topNQuery.getDimensionSpec().getOutputName();
String dimValue = input.getStringDimensionValue(dimOutputName);
Map<String, Object> map = input.getBaseObject();
map.put(
dimOutputName,
topNQuery.getDimensionSpec().getExtractionFn().apply(dimValue)
);
return input;
}
}
)
)
);
}
}
);
}
}
};
}
public Ordering<Result<TopNResultValue>> getOrdering()

View File

@ -18,9 +18,11 @@
package io.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.UOE;
import com.metamx.common.guava.Sequence;
import io.druid.granularity.QueryGranularity;
@ -48,8 +50,10 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -202,9 +206,23 @@ public class QueryRunnerTestHelper
Arrays.asList(new Interval("2020-04-02T00:00:00.000Z/P1D"))
);
@SuppressWarnings("unchecked")
public static Collection<?> makeQueryRunners(
QueryRunnerFactory factory
public static Iterable<Object[]> transformToConstructionFeeder(Iterable<?> in)
{
return Iterables.transform(
in, new Function<Object, Object[]>()
{
@Nullable
@Override
public Object[] apply(@Nullable Object input)
{
return new Object[]{input};
}
}
);
}
public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunners(
QueryRunnerFactory<T, QueryType> factory
)
throws IOException
{
@ -212,21 +230,11 @@ public class QueryRunnerTestHelper
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
final IncrementalIndex rtIndexOffheap = TestIndex.getIncrementalTestIndex(true);
return Arrays.asList(
new Object[][]{
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
},
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
}
}
return ImmutableList.of(
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)),
makeQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex)),
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId))
);
}
@ -263,9 +271,63 @@ public class QueryRunnerTestHelper
}
);
}
/**
* Iterate through the iterables in a synchronous manner and return each step as an Object[]
* @param in The iterables to step through. (effectively columns)
* @return An iterable of Object[] containing the "rows" of the input (effectively rows)
*/
public static Iterable<Object[]> transformToConstructionFeeder(Iterable<?>... in)
{
if (in == null) {
return ImmutableList.<Object[]>of();
}
final List<Iterable<?>> iterables = Arrays.asList(in);
final int length = in.length;
final List<Iterator<?>> iterators = new ArrayList<>(in.length);
for (Iterable<?> iterable : iterables) {
iterators.add(iterable.iterator());
}
return new Iterable<Object[]>()
{
@Override
public Iterator<Object[]> iterator()
{
return new Iterator<Object[]>()
{
@Override
public boolean hasNext()
{
int hasMore = 0;
for (Iterator<?> it : iterators) {
if (it.hasNext()) {
++hasMore;
}
}
return hasMore == length;
}
public static <T> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
@Override
public Object[] next()
{
final ArrayList<Object> list = new ArrayList<Object>(length);
for (Iterator<?> it : iterators) {
list.add(it.next());
}
return list.toArray();
}
@Override
public void remove()
{
throw new UOE("Remove not supported");
}
};
}
};
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
Segment adapter
)
{
@ -274,7 +336,7 @@ public class QueryRunnerTestHelper
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
),
factory.getToolchest()
(QueryToolChest<T, Query<T>>)factory.getToolchest()
);
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
@RunWith(Parameterized.class)
public class FunctionalExtractionTest
{
private static class SimpleFunctionExtraction extends FunctionalExtraction
{
public SimpleFunctionExtraction(
Function<String, String> extractionFunction,
Boolean retainMissingValue,
String replaceMissingValueWith,
Boolean uniqueProjections
)
{
super(extractionFunction, retainMissingValue, replaceMissingValueWith, uniqueProjections);
}
@Override
public byte[] getCacheKey()
{
return new byte[0];
}
}
private static final Function<String, String> NULL_FN = new Function<String, String>()
{
@Nullable
@Override
public String apply(String input)
{
return null;
}
};
private static final Function<String, String> TURTLE_FN = new Function<String, String>()
{
@Nullable
@Override
public String apply(@Nullable String input)
{
return "turtles";
}
};
private static final Function<String, String> EMPTY_STR_FN = new Function<String, String>()
{
@Nullable
@Override
public String apply(@Nullable String input)
{
return "";
}
};
private static final Function<String, String> IDENTITY_FN = new Function<String, String>()
{
@Nullable
@Override
public String apply(@Nullable String input)
{
return input;
}
};
private static final Function<String, String> ONLY_PRESENT = new Function<String, String>()
{
@Nullable
@Override
public String apply(@Nullable String input)
{
return PRESENT_KEY.equals(input) ? PRESENT_VALUE : null;
}
};
private static String PRESENT_KEY = "present";
private static String PRESENT_VALUE = "present_value";
private static String MISSING = "missing";
@Parameterized.Parameters
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{NULL_FN},
new Object[]{TURTLE_FN},
new Object[]{EMPTY_STR_FN},
new Object[]{IDENTITY_FN},
new Object[]{ONLY_PRESENT}
);
}
private final Function<String, String> fn;
public FunctionalExtractionTest(Function<String, String> fn)
{
this.fn = fn;
}
@Test
public void testRetainMissing()
{
final String in = "NOT PRESENT";
final FunctionalExtraction exFn = new SimpleFunctionExtraction(
fn,
true,
null,
false
);
final String out = fn.apply(in);
Assert.assertEquals(Strings.isNullOrEmpty(out) ? in : out, exFn.apply(in));
}
@Test
public void testReplaceMissing()
{
final String in = "NOT PRESENT";
final FunctionalExtraction exFn = new SimpleFunctionExtraction(
fn,
false,
MISSING,
false
);
final String out = fn.apply(in);
Assert.assertEquals(Strings.isNullOrEmpty(out) ? MISSING : out, exFn.apply(in));
}
@Test
public void testReplaceMissingBlank()
{
final String in = "NOT PRESENT";
final FunctionalExtraction exFn = new SimpleFunctionExtraction(
fn,
false,
"",
false
);
final String out = fn.apply(in);
Assert.assertEquals(Strings.isNullOrEmpty(out) ? null : out, exFn.apply(in));
}
@Test
public void testOnlyOneValuePresent()
{
final String in = PRESENT_KEY;
final FunctionalExtraction exFn = new SimpleFunctionExtraction(
fn,
false,
"",
false
);
final String out = fn.apply(in);
Assert.assertEquals(Strings.isNullOrEmpty(out) ? null : out, exFn.apply(in));
}
@Test
public void testNullInputs()
{
final FunctionalExtraction exFn = new SimpleFunctionExtraction(
fn,
true,
null,
false
);
if (Strings.isNullOrEmpty(fn.apply(null))) {
Assert.assertEquals(null, exFn.apply(null));
}
}
@Test(expected = IllegalArgumentException.class)
public void testBadConfig()
{
final FunctionalExtraction exFn = new SimpleFunctionExtraction(
fn,
true,
MISSING,
false
);
}
public void testUniqueProjections()
{
Assert.assertEquals(
ExtractionFn.ExtractionType.MANY_TO_ONE,
new SimpleFunctionExtraction(
fn,
true,
null,
false
).getExtractionType()
);
Assert.assertEquals(
ExtractionFn.ExtractionType.MANY_TO_ONE,
new SimpleFunctionExtraction(
fn,
true,
null,
false
).getExtractionType()
);
Assert.assertEquals(
ExtractionFn.ExtractionType.ONE_TO_ONE,
new SimpleFunctionExtraction(
fn,
true,
null,
true
).getExtractionType()
);
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Injector;
import com.google.inject.Key;
import io.druid.guice.GuiceInjectors;
import io.druid.guice.annotations.Json;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
/**
*
*/
public class MapLookupExtractionFnSerDeTest
{
private static ObjectMapper mapper;
private static final Map<String, String> renames = ImmutableMap.of(
"foo", "bar",
"bar", "baz"
);
@BeforeClass
public static void setup() throws JsonProcessingException
{
Injector defaultInjector = GuiceInjectors.makeStartupInjector();
mapper = defaultInjector.getInstance(Key.get(ObjectMapper.class, Json.class));
}
@Test
public void testDeserialization() throws IOException
{
final DimExtractionFn fn = mapper.reader(DimExtractionFn.class).readValue(
String.format(
"{\"type\":\"lookup\",\"lookup\":{\"type\":\"map\", \"map\":%s}}",
mapper.writeValueAsString(renames)
)
);
for (String key : renames.keySet()) {
Assert.assertEquals(renames.get(key), fn.apply(key));
}
final String crazyString = UUID.randomUUID().toString();
Assert.assertEquals(null, fn.apply(crazyString));
Assert.assertEquals(
crazyString, mapper.reader(DimExtractionFn.class).<DimExtractionFn>readValue(
String.format(
"{\"type\":\"lookup\",\"lookup\":{\"type\":\"map\", \"map\":%s}, \"retainMissingValue\":true}",
mapper.writeValueAsString(renames)
)
).apply(crazyString)
);
}
}

View File

@ -46,7 +46,7 @@ public class JavascriptExtractionFnTest
public void testJavascriptSubstring()
{
String function = "function(str) { return str.substring(0,3); }";
ExtractionFn extractionFn = new JavascriptExtractionFn(function);
ExtractionFn extractionFn = new JavascriptExtractionFn(function, false);
for (String str : testStrings) {
String res = extractionFn.apply(str);
@ -59,28 +59,28 @@ public class JavascriptExtractionFnTest
{
String utcHour = "function(t) {\nreturn 'Second ' + Math.floor((t % 60000) / 1000);\n}";
final long millis = new DateTime("2015-01-02T13:00:59.999Z").getMillis();
Assert.assertEquals("Second 59" , new JavascriptExtractionFn(utcHour).apply(millis));
Assert.assertEquals("Second 59" , new JavascriptExtractionFn(utcHour, false).apply(millis));
}
@Test
public void testLongs() throws Exception
{
String typeOf = "function(x) {\nreturn typeof x\n}";
Assert.assertEquals("number", new JavascriptExtractionFn(typeOf).apply(1234L));
Assert.assertEquals("number", new JavascriptExtractionFn(typeOf, false).apply(1234L));
}
@Test
public void testFloats() throws Exception
{
String typeOf = "function(x) {\nreturn typeof x\n}";
Assert.assertEquals("number", new JavascriptExtractionFn(typeOf).apply(1234.0));
Assert.assertEquals("number", new JavascriptExtractionFn(typeOf, false).apply(1234.0));
}
@Test
public void testCastingAndNull()
{
String function = "function(x) {\n x = Number(x);\n if(isNaN(x)) return null;\n return Math.floor(x / 5) * 5;\n}";
ExtractionFn extractionFn = new JavascriptExtractionFn(function);
ExtractionFn extractionFn = new JavascriptExtractionFn(function, false);
Iterator<String> it = Iterators.forArray("0", "5", "5", "10", null);
@ -95,7 +95,7 @@ public class JavascriptExtractionFnTest
public void testJavascriptRegex()
{
String function = "function(str) { return str.replace(/[aeiou]/g, ''); }";
ExtractionFn extractionFn = new JavascriptExtractionFn(function);
ExtractionFn extractionFn = new JavascriptExtractionFn(function, false);
Iterator it = Iterators.forArray("Qt", "Clgry", "Tky", "Stckhlm", "Vncvr", "Prtr", "Wllngtn", "Ontr");
for (String str : testStrings) {
@ -299,7 +299,7 @@ public class JavascriptExtractionFnTest
+ ""
+ "}";
ExtractionFn extractionFn = new JavascriptExtractionFn(function);
ExtractionFn extractionFn = new JavascriptExtractionFn(function, false);
Iterator<String> inputs = Iterators.forArray("introducing", "exploratory", "analytics", "on", "large", "datasets");
Iterator<String> it = Iterators.forArray("introduc", "exploratori", "analyt", "on", "larg", "dataset");
@ -328,4 +328,11 @@ public class JavascriptExtractionFnTest
)
);
}
@Test
public void testInjective()
{
Assert.assertEquals(ExtractionFn.ExtractionType.MANY_TO_ONE, new JavascriptExtractionFn("function(str) { return str; }", false).getExtractionType());
Assert.assertEquals(ExtractionFn.ExtractionType.ONE_TO_ONE, new JavascriptExtractionFn("function(str) { return str; }", true).getExtractionType());
}
}

View File

@ -0,0 +1,80 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction.extraction;
import com.google.common.collect.ImmutableMap;
import io.druid.query.extraction.LookupExtractionFn;
import io.druid.query.extraction.MapLookupExtractor;
import org.junit.Assert;
import org.junit.Test;
/**
* This test suite clarifies some behavior around specific corner cases
*/
public class LookupExtractionFnExpectationsTest
{
@Test
public void testMissingKeyIsNull()
{
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
true,
null,
false
);
Assert.assertNull(lookupExtractionFn.apply(null));
}
@Test
public void testMissingKeyIsReplaced()
{
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
false,
"REPLACE",
false
);
Assert.assertEquals("REPLACE", lookupExtractionFn.apply(null));
}
@Test
public void testNullKeyIsMappable()
{
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("", "bar")),
false,
"REPLACE",
false
);
Assert.assertEquals("bar", lookupExtractionFn.apply(null));
}
@Test
public void testNullValue()
{
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "")),
false,
"REPLACE",
false
);
Assert.assertEquals("REPLACE", lookupExtractionFn.apply(null));
}
}

View File

@ -0,0 +1,198 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.extraction.extraction;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.IAE;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.extraction.LookupExtractionFn;
import io.druid.query.extraction.MapLookupExtractor;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class LookupExtractionFnTest
{
@Parameterized.Parameters
public static Iterable<Object[]> constructorFeeder()
{
return Iterables.transform(
Sets.cartesianProduct(
ImmutableList.of(
ImmutableSet.of(true, false),
ImmutableSet.of("", "MISSING VALUE"),
ImmutableSet.of(true, false)
)
), new Function<List<?>, Object[]>()
{
@Nullable
@Override
public Object[] apply(List<?> input)
{
return input.toArray();
}
}
);
}
private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
private final boolean retainMissing;
private final String replaceMissing;
private final boolean injective;
public LookupExtractionFnTest(boolean retainMissing, String replaceMissing, boolean injective)
{
this.replaceMissing = Strings.emptyToNull(replaceMissing);
this.retainMissing = retainMissing;
this.injective = injective;
}
@BeforeClass
public static void setUpStatic()
{
OBJECT_MAPPER.registerSubtypes(LookupExtractionFn.class);
}
@Test
public void testSimpleSerDe() throws IOException
{
if (retainMissing && !Strings.isNullOrEmpty(replaceMissing)) {
// skip
return;
}
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
replaceMissing,
injective
);
final String str1 = OBJECT_MAPPER.writeValueAsString(lookupExtractionFn);
final LookupExtractionFn lookupExtractionFn2 = OBJECT_MAPPER.readValue(str1, LookupExtractionFn.class);
Assert.assertEquals(retainMissing, lookupExtractionFn2.isRetainMissingValue());
Assert.assertEquals(replaceMissing, lookupExtractionFn2.getReplaceMissingValueWith());
Assert.assertEquals(injective, lookupExtractionFn2.isInjective());
Assert.assertArrayEquals(lookupExtractionFn.getCacheKey(), lookupExtractionFn2.getCacheKey());
Assert.assertEquals(
str1,
OBJECT_MAPPER.writeValueAsString(lookupExtractionFn2)
);
}
@Test(expected = IllegalArgumentException.class)
public void testIllegalArgs()
{
if (retainMissing && !Strings.isNullOrEmpty(replaceMissing)) {
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
Strings.emptyToNull(replaceMissing),
injective
);
} else {
throw new IAE("Case not valid");
}
}
@Test
public void testCacheKey()
{
if (retainMissing && !Strings.isNullOrEmpty(replaceMissing)) {
// skip
return;
}
final Map<String, String> weirdMap = Maps.newHashMap();
weirdMap.put("foobar", null);
final LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
replaceMissing,
injective
);
if (Strings.isNullOrEmpty(replaceMissing) || retainMissing) {
Assert.assertFalse(
Arrays.equals(
lookupExtractionFn.getCacheKey(),
new LookupExtractionFn(
lookupExtractionFn.getLookup(),
!lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
lookupExtractionFn.isInjective()
).getCacheKey()
)
);
Assert.assertFalse(
Arrays.equals(
lookupExtractionFn.getCacheKey(),
new LookupExtractionFn(
lookupExtractionFn.getLookup(),
!lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
!lookupExtractionFn.isInjective()
).getCacheKey()
)
);
}
Assert.assertFalse(
Arrays.equals(
lookupExtractionFn.getCacheKey(),
new LookupExtractionFn(
new MapLookupExtractor(weirdMap),
lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
lookupExtractionFn.isInjective()
).getCacheKey()
)
);
Assert.assertFalse(
Arrays.equals(
lookupExtractionFn.getCacheKey(),
new LookupExtractionFn(
lookupExtractionFn.getLookup(),
lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
!lookupExtractionFn.isInjective()
).getCacheKey()
)
);
}
}

View File

@ -57,8 +57,9 @@ import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.dimension.ExtractionDimensionSpec;
import io.druid.query.extraction.DimExtractionFn;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.LookupExtractionFn;
import io.druid.query.extraction.MapLookupExtractor;
import io.druid.query.extraction.RegexDimExtractionFn;
import io.druid.query.extraction.TimeFormatExtractionFn;
import io.druid.query.filter.DimFilter;
@ -93,6 +94,7 @@ import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -174,7 +176,7 @@ public class GroupByQueryRunnerTest
@Override
public Object apply(@Nullable Object input)
{
return new Object[]{factory, ((Object[]) input)[0]};
return new Object[]{factory, input};
}
};
@ -241,6 +243,156 @@ public class GroupByQueryRunnerTest
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithRebucketRename()
{
Map<String, String> map = new HashMap<>();
map.put("automotive", "automotive0");
map.put("business", "business0");
map.put("entertainment", "entertainment0");
map.put("health", "health0");
map.put("mezzanine", "mezzanine0");
map.put("news", "news0");
map.put("premium", "premium0");
map.put("technology", "technology0");
map.put("travel", "travel0");
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), false, null, false), null
)
)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive0", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business0", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-01",
"alias",
"entertainment0",
"rows",
1L,
"idx",
158L
),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health0", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine0", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news0", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium0", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology0", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel0", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive0", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business0", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-02",
"alias",
"entertainment0",
"rows",
1L,
"idx",
166L
),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health0", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine0", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news0", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium0", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology0", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel0", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithSimpleRename()
{
Map<String, String> map = new HashMap<>();
map.put("automotive", "automotive0");
map.put("business", "business0");
map.put("entertainment", "entertainment0");
map.put("health", "health0");
map.put("mezzanine", "mezzanine0");
map.put("news", "news0");
map.put("premium", "premium0");
map.put("technology", "technology0");
map.put("travel", "travel0");
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), false, null, true), null
)
)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive0", "rows", 1L, "idx", 135L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business0", "rows", 1L, "idx", 118L),
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-01",
"alias",
"entertainment0",
"rows",
1L,
"idx",
158L
),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health0", "rows", 1L, "idx", 120L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine0", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news0", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium0", "rows", 3L, "idx", 2900L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology0", "rows", 1L, "idx", 78L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel0", "rows", 1L, "idx", 119L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive0", "rows", 1L, "idx", 147L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business0", "rows", 1L, "idx", 112L),
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-02",
"alias",
"entertainment0",
"rows",
1L,
"idx",
166L
),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health0", "rows", 1L, "idx", 113L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine0", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news0", "rows", 1L, "idx", 114L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium0", "rows", 3L, "idx", 2505L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology0", "rows", 1L, "idx", 97L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel0", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testGroupByWithUniques()
{
@ -342,8 +494,7 @@ public class GroupByQueryRunnerTest
@Test
public void testGroupByWithNullProducingDimExtractionFn()
{
final ExtractionFn fn1 = new RegexDimExtractionFn("(\\w{1})");
final ExtractionFn nullExtractionFn = new DimExtractionFn()
final ExtractionFn nullExtractionFn = new RegexDimExtractionFn("(\\w{1})")
{
@Override
public byte[] getCacheKey()
@ -354,16 +505,9 @@ public class GroupByQueryRunnerTest
@Override
public String apply(String dimValue)
{
return dimValue.equals("mezzanine") ? null : fn1.apply(dimValue);
}
@Override
public boolean preservesOrdering()
{
return false;
return dimValue.equals("mezzanine") ? null : super.apply(dimValue);
}
};
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
@ -417,8 +561,7 @@ public class GroupByQueryRunnerTest
*/
public void testGroupByWithEmptyStringProducingDimExtractionFn()
{
final ExtractionFn fn1 = new RegexDimExtractionFn("(\\w{1})");
final ExtractionFn emptyStringExtractionFn = new DimExtractionFn()
final ExtractionFn emptyStringExtractionFn = new RegexDimExtractionFn("(\\w{1})")
{
@Override
public byte[] getCacheKey()
@ -429,13 +572,7 @@ public class GroupByQueryRunnerTest
@Override
public String apply(String dimValue)
{
return dimValue.equals("mezzanine") ? "" : fn1.apply(dimValue);
}
@Override
public boolean preservesOrdering()
{
return false;
return dimValue.equals("mezzanine") ? "" : super.apply(dimValue);
}
};
@ -3262,9 +3399,158 @@ public class GroupByQueryRunnerTest
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(Executors.newCachedThreadPool(), singleSegmentRunners)),
toolChest
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(Executors.newCachedThreadPool(), singleSegmentRunners)),
toolChest
)
);
TestHelper.assertExpectedObjects(bySegmentResults, theRunner.run(fullQuery, Maps.newHashMap()), "");
exec.shutdownNow();
}
@Test
public void testBySegmentResultsUnOptimizedDimextraction()
{
int segmentCount = 32;
Result<BySegmentResultValue> singleSegmentResult = new Result<BySegmentResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new BySegmentResultValueClass(
Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-01",
"alias",
"mezzanine0",
"rows",
6L,
"idx",
4420L
)
), "testSegment", new Interval("2011-04-02T00:00:00.000Z/2011-04-04T00:00:00.000Z")
)
);
List<Result> bySegmentResults = Lists.newArrayList();
for (int i = 0; i < segmentCount; i++) {
bySegmentResults.add(singleSegmentResult);
}
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality",
"alias",
new LookupExtractionFn(
new MapLookupExtractor(
ImmutableMap.of(
"mezzanine",
"mezzanine0"
)
), false, null, false
),
null
)
)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setDimFilter(new SelectorDimFilter("quality", "mezzanine"))
.setContext(ImmutableMap.<String, Object>of("bySegment", true));
final GroupByQuery fullQuery = builder.build();
QueryToolChest toolChest = factory.getToolchest();
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
for (int i = 0; i < segmentCount; i++) {
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(Executors.newCachedThreadPool(), singleSegmentRunners)),
toolChest
)
);
TestHelper.assertExpectedObjects(bySegmentResults, theRunner.run(fullQuery, Maps.newHashMap()), "");
exec.shutdownNow();
}
@Test
public void testBySegmentResultsOptimizedDimextraction()
{
int segmentCount = 32;
Result<BySegmentResultValue> singleSegmentResult = new Result<BySegmentResultValue>(
new DateTime("2011-01-12T00:00:00.000Z"),
new BySegmentResultValueClass(
Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"2011-04-01",
"alias",
"mezzanine0",
"rows",
6L,
"idx",
4420L
)
), "testSegment", new Interval("2011-04-02T00:00:00.000Z/2011-04-04T00:00:00.000Z")
)
);
List<Result> bySegmentResults = Lists.newArrayList();
for (int i = 0; i < segmentCount; i++) {
bySegmentResults.add(singleSegmentResult);
}
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality",
"alias",
new LookupExtractionFn(
new MapLookupExtractor(
ImmutableMap.of(
"mezzanine",
"mezzanine0"
)
), false, null, true
),
null
)
)
)
.setAggregatorSpecs(
Arrays.asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null))
.setDimFilter(new SelectorDimFilter("quality", "mezzanine"))
.setContext(ImmutableMap.<String, Object>of("bySegment", true));
final GroupByQuery fullQuery = builder.build();
QueryToolChest toolChest = factory.getToolchest();
List<QueryRunner<Row>> singleSegmentRunners = Lists.newArrayList();
for (int i = 0; i < segmentCount; i++) {
singleSegmentRunners.add(toolChest.preMergeQueryDecoration(runner));
}
ExecutorService exec = Executors.newCachedThreadPool();
QueryRunner theRunner = toolChest.postMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(factory.mergeRunners(Executors.newCachedThreadPool(), singleSegmentRunners)),
toolChest
)
);
TestHelper.assertExpectedObjects(bySegmentResults, theRunner.run(fullQuery, Maps.newHashMap()), "");

View File

@ -21,6 +21,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
@ -38,10 +39,12 @@ import io.druid.query.timeseries.TimeseriesResultValue;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
@ -51,7 +54,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{
@SuppressWarnings("unchecked")
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
@ -75,64 +78,60 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(configSupplier, new DefaultObjectMapper(),
engine, TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
TestQueryRunners.pool
new GroupByQueryQueryToolChest(
configSupplier, new DefaultObjectMapper(),
engine, TestQueryRunners.pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
TestQueryRunners.pool
);
final Collection<?> objects = QueryRunnerTestHelper.makeQueryRunners(factory);
Object[][] newObjects = new Object[objects.size()][];
int i = 0;
for (Object object : objects) {
if (object instanceof Object[]) {
Object[] queryRunnerArray = (Object[]) object;
Preconditions.checkState(queryRunnerArray.length == 1);
Preconditions.checkState(queryRunnerArray[0] instanceof QueryRunner);
final QueryRunner groupByRunner = (QueryRunner) queryRunnerArray[0];
QueryRunner timeseriesRunner = new QueryRunner()
{
@Override
public Sequence run(Query query, Map responseContext)
{
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
return Sequences.map(
groupByRunner.run(
GroupByQuery.builder()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build(),
responseContext
),
new Function<Row, Result<TimeseriesResultValue>>()
return QueryRunnerTestHelper.transformToConstructionFeeder(
Lists.transform(
QueryRunnerTestHelper.makeQueryRunners(factory),
new Function<QueryRunner<Row>, Object>()
{
@Nullable
@Override
public Object apply(final QueryRunner<Row> input)
{
return new QueryRunner()
{
@Override
public Result<TimeseriesResultValue> apply(final Row input)
public Sequence run(Query query, Map responseContext)
{
MapBasedRow row = (MapBasedRow) input;
TimeseriesQuery tsQuery = (TimeseriesQuery) query;
return new Result<TimeseriesResultValue>(
row.getTimestamp(), new TimeseriesResultValue(row.getEvent())
return Sequences.map(
input.run(
GroupByQuery.builder()
.setDataSource(tsQuery.getDataSource())
.setQuerySegmentSpec(tsQuery.getQuerySegmentSpec())
.setGranularity(tsQuery.getGranularity())
.setDimFilter(tsQuery.getDimensionsFilter())
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.build(),
responseContext
),
new Function<Row, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(final Row input)
{
MapBasedRow row = (MapBasedRow) input;
return new Result<TimeseriesResultValue>(
row.getTimestamp(), new TimeseriesResultValue(row.getEvent())
);
}
}
);
}
}
);
}
};
newObjects[i] = new Object[]{timeseriesRunner};
++i;
}
}
return Arrays.asList(newObjects);
};
}
}
)
);
}
public GroupByTimeseriesQueryRunnerTest(QueryRunner runner)

View File

@ -17,6 +17,7 @@
package io.druid.query.search;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -37,6 +38,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@ -52,13 +54,17 @@ import java.util.Set;
public class SearchQueryRunnerTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new SearchQueryRunnerFactory(
new SearchQueryQueryToolChest(new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
return QueryRunnerTestHelper.transformToConstructionFeeder(
QueryRunnerTestHelper.makeQueryRunners(
new SearchQueryRunnerFactory(
new SearchQueryQueryToolChest(
new SearchQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
}

View File

@ -52,14 +52,18 @@ import java.util.Map;
public class SelectQueryRunnerTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
return QueryRunnerTestHelper.transformToConstructionFeeder(
QueryRunnerTestHelper.makeQueryRunners(
new SelectQueryRunnerFactory(
new SelectQueryQueryToolChest(
new DefaultObjectMapper(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new SelectQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
}

View File

@ -45,10 +45,12 @@ import java.util.Map;
public class TimeBoundaryQueryRunnerTest
{
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
return QueryRunnerTestHelper.transformToConstructionFeeder(
QueryRunnerTestHelper.makeQueryRunners(
new TimeBoundaryQueryRunnerFactory(QueryRunnerTestHelper.NOOP_QUERYWATCHER)
)
);
}

View File

@ -67,14 +67,17 @@ public class TimeseriesQueryRunnerTest
public static final Map<String, Object> CONTEXT = ImmutableMap.of();
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.makeQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
return QueryRunnerTestHelper.transformToConstructionFeeder(
QueryRunnerTestHelper.makeQueryRunners(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
);
}

View File

@ -128,6 +128,12 @@ public class ExtractionDimFilterTest
{
return false;
}
@Override
public ExtractionType getExtractionType()
{
return ExtractionType.MANY_TO_ONE;
}
};
@Test