Optimization of extraction filter by reversing the lookup

This commit is contained in:
Slim Bouguerra 2015-12-10 17:53:26 -05:00
parent efc90ee63b
commit 032d3bf6e6
28 changed files with 456 additions and 81 deletions

View File

@ -248,6 +248,57 @@ It is illegal to set `retainMissingValue = true` and also specify a `replaceMiss
A property of `injective` specifies if optimizations can be used which assume there is no combining of multiple names into one. For example: If ABC123 is the only key that maps to SomeCompany, that can be optimized since it is a unique lookup. But if both ABC123 and DEF456 BOTH map to SomeCompany, then that is NOT a unique lookup. Setting this value to true and setting `retainMissingValue` to FALSE (the default) may cause undesired behavior.
A property `optimize` can be supplied to allow optimization of lookup based extraction filter (by default `optimize = false`).
The optimization layer will run on the broker and it will rewrite the extraction filter as clause of selector filters.
For instance the following filter
```json
{
"filter": {
"type": "extraction",
"dimension": "product",
"value": "bar_1",
"extractionFn": {
"type": "lookup",
"optimize": true,
"lookup": {
"type": "map",
"map": {
"product_1": "bar_1",
"product_3": "bar_1"
}
}
}
}
}
```
will be rewritten as
```json
{
"filter":{
"type":"or",
"fields":[
{
"filter":{
"type":"selector",
"dimension":"product",
"value":"product_1"
}
},
{
"filter":{
"type":"selector",
"dimension":"product",
"value":"product_3"
}
}
]
}
}
```
A null dimension value can be mapped to a specific value by specifying the empty string as the key.
This allows distinguishing between a null dimension and a lookup resulting in a null.
For example, specifying `{"":"bar","bat":"baz"}` with dimension values `[null, "foo", "bat"]` and replacing missing values with `"oof"` will yield results of `["bar", "oof", "baz"]`.

View File

@ -26,7 +26,7 @@ Namespaced lookups are appropriate for lookups which are not possible to pass at
"table": "lookupTable", "keyColumn": "mykeyColumn", "valueColumn": "MyValueColumn", "tsColumn": "timeColumn"}]
```
Proper funcitonality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes:
Proper functionality of Namespaced lookups requires the following extension to be loaded on the broker, peon, and historical nodes:
`io.druid.extensions:druid-namespace-lookup`
## Cache Settings

View File

@ -41,6 +41,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* potentially merges the stream of ordered ResultType objects.
*
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending)
*
* @return a QueryRunner that potentialy merges the stream of ordered ResultType objects
*/
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
@ -48,15 +49,16 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* This method doesn't belong here, but it's here for now just to make it work. The method needs to
* take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending)
*
* <p>
* This method assumes that its input sequences provide values already in sorted order.
* Even more specifically, it assumes that the individual sequences are also ordered by their first element.
*
* <p>
* In the vast majority of cases, this should just be implemented with:
*
* return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
* <p>
* return new OrderedMergeSequence<>(getOrdering(), seqOfSequences);
*
* @param seqOfSequences sequence of sequences to be merged
*
* @return the sequence of merged results
*/
public abstract Sequence<ResultType> mergeSequences(Sequence<Sequence<ResultType>> seqOfSequences);
@ -64,15 +66,16 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* This method doesn't belong here, but it's here for now just to make it work. The method needs to
* take a Sequence of Sequences and return a single Sequence of ResultType objects in time-order (ascending)
*
* <p>
* This method assumes that its input sequences provide values already in sorted order, but, unlike
* mergeSequences, it does *not* assume that the individual sequences are also ordered by their first element.
*
* <p>
* In the vast majority if ocases, this hsould just be implemented with:
*
* return new MergeSequence<>(getOrdering(), seqOfSequences);
* <p>
* return new MergeSequence<>(getOrdering(), seqOfSequences);
*
* @param seqOfSequences sequence of sequences to be merged
*
* @return the sequence of merged results
*/
public abstract Sequence<ResultType> mergeSequencesUnordered(Sequence<Sequence<ResultType>> seqOfSequences);
@ -85,6 +88,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* a TopN query or the number of dimensions included for a groupBy query.
*
* @param query The query that is being processed
*
* @return A MetricEvent.Builder that can be used to make metrics for the provided query
*/
public abstract ServiceMetricEvent.Builder makeMetricBuilder(QueryType query);
@ -92,15 +96,16 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Creates a Function that can take in a ResultType and return a new ResultType having applied
* the MetricManipulatorFn to each of the metrics.
*
* <p>
* This exists because the QueryToolChest is the only thing that understands the internal serialization
* format of ResultType, so it's primary responsibility is to "decompose" that structure and apply the
* given function to all metrics.
*
* <p>
* This function is called very early in the processing pipeline on the Broker.
*
* @param query The Query that is currently being processed
* @param fn The function that should be applied to all metrics in the results
* @param fn The function that should be applied to all metrics in the results
*
* @return A function that will apply the provided fn to all metrics in the input ResultType object
*/
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
@ -112,11 +117,12 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* Generally speaking this is the exact same thing as makePreComputeManipulatorFn. It is leveraged in
* order to compute PostAggregators on results after they have been completely merged together, which
* should actually be done in the mergeResults() call instead of here.
*
* <p>
* This should never actually be overridden and it should be removed as quickly as possible.
*
* @param query The Query that is currently being processed
* @param fn The function that should be applied to all metrics in the results
* @param fn The function that should be applied to all metrics in the results
*
* @return A function that will apply the provided fn to all metrics in the input ResultType object
*/
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
@ -134,11 +140,12 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Returns a CacheStrategy to be used to load data into the cache and remove it from the cache.
*
* <p>
* This is optional. If it returns null, caching is effectively disabled for the query.
*
* @param query The query whose results might be cached
* @param <T> The type of object that will be stored in the cache
* @param <T> The type of object that will be stored in the cache
*
* @return A CacheStrategy that can be used to populate and read from the Cache
*/
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
@ -149,14 +156,15 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists *before* being passed to
* mergeResults().
*
* <p>
* In fact, the return value of this method is always passed to mergeResults, so it is equivalent to
* just implement this functionality as extra decoration on the QueryRunner during mergeResults().
*
* <p>
* In the interests of potentially simplifying these interfaces, the recommendation is to actually not
* override this method and instead apply anything that might be needed here in the mergeResults() call.
*
* @param runner The runner to be wrapped
*
* @return The wrapped runner
*/
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner)
@ -166,14 +174,15 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists coming out of mergeResults()
*
* <p>
* In fact, the input value of this method is always the return value from mergeResults, so it is equivalent
* to just implement this functionality as extra decoration on the QueryRunner during mergeResults().
*
* <p>
* In the interests of potentially simplifying these interfaces, the recommendation is to actually not
* override this method and instead apply anything that might be needed here in the mergeResults() call.
*
* @param runner The runner to be wrapped
*
* @return The wrapped runner
*/
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner)
@ -186,9 +195,10 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* be queried. It can use whatever criteria it wants in order to do the pruning, it just needs to
* return the list of Segments it actually wants to see queried.
*
* @param query The query being processed
* @param query The query being processed
* @param segments The list of candidate segments to be queried
* @param <T> A Generic parameter because Java is cool
* @param <T> A Generic parameter because Java is cool
*
* @return The list of segments to actually query
*/
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments)

View File

@ -36,18 +36,15 @@ public class LookupExtractionFn extends FunctionalExtraction
private static final byte CACHE_TYPE_ID = 0x7;
private final LookupExtractor lookup;
private final boolean optimize;
@JsonCreator
public LookupExtractionFn(
@JsonProperty("lookup")
final LookupExtractor lookup,
@JsonProperty("retainMissingValue")
final boolean retainMissingValue,
@Nullable
@JsonProperty("replaceMissingValueWith")
final String replaceMissingValueWith,
@JsonProperty("injective")
final boolean injective
@JsonProperty("lookup") final LookupExtractor lookup,
@JsonProperty("retainMissingValue") final boolean retainMissingValue,
@Nullable @JsonProperty("replaceMissingValueWith") final String replaceMissingValueWith,
@JsonProperty("injective") final boolean injective,
@JsonProperty("optimize") Boolean optimize
)
{
super(
@ -65,6 +62,7 @@ public class LookupExtractionFn extends FunctionalExtraction
injective
);
this.lookup = lookup;
this.optimize = optimize == null ? false : optimize;
}
@ -89,6 +87,12 @@ public class LookupExtractionFn extends FunctionalExtraction
return super.isInjective();
}
@JsonProperty("optimize")
public boolean isOptimize()
{
return optimize;
}
@Override
public byte[] getCacheKey()
{
@ -98,9 +102,11 @@ public class LookupExtractionFn extends FunctionalExtraction
outputStream.write(lookup.getCacheKey());
if (getReplaceMissingValueWith() != null) {
outputStream.write(StringUtils.toUtf8(getReplaceMissingValueWith()));
outputStream.write(0xFF);
}
outputStream.write(isInjective() ? 1 : 0);
outputStream.write(isRetainMissingValue() ? 1 : 0);
outputStream.write(isOptimize() ? 1 : 0);
return outputStream.toByteArray();
}
catch (IOException ex) {
@ -115,19 +121,24 @@ public class LookupExtractionFn extends FunctionalExtraction
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (!(o instanceof LookupExtractionFn)) {
return false;
}
LookupExtractionFn that = (LookupExtractionFn) o;
return lookup.equals(that.lookup);
if (isOptimize() != that.isOptimize()) {
return false;
}
return getLookup().equals(that.getLookup());
}
@Override
public int hashCode()
{
return lookup.hashCode();
int result = getLookup().hashCode();
result = 31 * result + (isOptimize() ? 1 : 0);
return result;
}
}

View File

@ -78,7 +78,7 @@ public abstract class LookupExtractor
* In the other hand returning a list with the null element implies user want to map the none existing value to the key null.
*/
abstract List<String> unapply(String value);
public abstract List<String> unapply(String value);
/**
* @param values Iterable of values for which will perform reverse lookup

View File

@ -21,8 +21,11 @@ package io.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.query.Druids;
import java.util.Collections;
import java.util.List;
@ -57,6 +60,19 @@ public class AndDimFilter implements DimFilter
return DimFilterCacheHelper.computeCacheKey(DimFilterCacheHelper.AND_CACHE_ID, fields);
}
@Override
public DimFilter optimize()
{
return Druids.newAndDimFilterBuilder().fields(Lists.transform(this.getFields(), new Function<DimFilter, DimFilter>()
{
@Override
public DimFilter apply(DimFilter input)
{
return input.optimize();
}
})).build();
}
@Override
public boolean equals(Object o)
{

View File

@ -127,6 +127,12 @@ public class BoundDimFilter implements DimFilter
return boundCacheBuffer.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@Override
public boolean equals(Object o)
{

View File

@ -42,4 +42,10 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
public interface DimFilter
{
public byte[] getCacheKey();
/**
* @return Returns an optimized filter.
* returning the same filter can be a straightforward default implementation.
*/
public DimFilter optimize();
}

View File

@ -21,11 +21,16 @@ package io.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.StringUtils;
import io.druid.query.extraction.ExtractionFn;
import io.druid.query.extraction.LookupExtractionFn;
import io.druid.query.extraction.LookupExtractor;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
@ -89,6 +94,28 @@ public class ExtractionDimFilter implements DimFilter
.array();
}
@Override
public DimFilter optimize()
{
if (this.getExtractionFn() instanceof LookupExtractionFn
&& ((LookupExtractionFn) this.getExtractionFn()).isOptimize()) {
LookupExtractor lookup = ((LookupExtractionFn) this.getExtractionFn()).getLookup();
final List<String> keys = lookup.unapply(this.getValue());
final String dimensionName = this.getDimension();
if (!keys.isEmpty()) {
return new OrDimFilter(Lists.transform(keys, new Function<String, DimFilter>()
{
@Override
public DimFilter apply(String input)
{
return new SelectorDimFilter(dimensionName, input);
}
}));
}
}
return this;
}
@Override
public String toString()
{

View File

@ -77,6 +77,12 @@ public class InDimFilter implements DimFilter
return filterCacheKey.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@Override
public int hashCode()
{

View File

@ -69,6 +69,12 @@ public class JavaScriptDimFilter implements DimFilter
.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@Override
public String toString()
{

View File

@ -30,4 +30,10 @@ public class NoopDimFilter implements DimFilter
{
return ByteBuffer.allocate(1).put(DimFilterCacheHelper.NOOP_CACHE_ID).array();
}
@Override
public DimFilter optimize()
{
return this;
}
}

View File

@ -22,6 +22,7 @@ package io.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.query.Druids;
import java.nio.ByteBuffer;
@ -54,6 +55,12 @@ public class NotDimFilter implements DimFilter
return ByteBuffer.allocate(1 + subKey.length).put(DimFilterCacheHelper.NOT_CACHE_ID).put(subKey).array();
}
@Override
public DimFilter optimize()
{
return Druids.newNotDimFilterBuilder().field(this.getField().optimize()).build();
}
@Override
public boolean equals(Object o)
{

View File

@ -21,8 +21,11 @@ package io.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import io.druid.query.Druids;
import java.util.Collections;
import java.util.List;
@ -57,6 +60,19 @@ public class OrDimFilter implements DimFilter
return DimFilterCacheHelper.computeCacheKey(DimFilterCacheHelper.OR_CACHE_ID, fields);
}
@Override
public DimFilter optimize()
{
return Druids.newOrDimFilterBuilder().fields(Lists.transform(this.getFields(), new Function<DimFilter, DimFilter>()
{
@Override
public DimFilter apply(DimFilter input)
{
return input.optimize();
}
})).build();
}
@Override
public boolean equals(Object o)
{

View File

@ -71,6 +71,12 @@ public class RegexDimFilter implements DimFilter
.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@Override
public String toString()
{

View File

@ -71,6 +71,12 @@ public class SearchQueryDimFilter implements DimFilter
.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@Override
public String toString()
{

View File

@ -59,6 +59,12 @@ public class SelectorDimFilter implements DimFilter
.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@JsonProperty
public String getDimension()
{

View File

@ -60,6 +60,12 @@ public class SpatialDimFilter implements DimFilter
.array();
}
@Override
public DimFilter optimize()
{
return this;
}
@JsonProperty
public String getDimension()
{

View File

@ -259,6 +259,23 @@ public class GroupByQuery extends BaseQuery<Row>
);
}
public GroupByQuery withDimFilter(final DimFilter dimFilter)
{
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
dimFilter,
getGranularity(),
getDimensions(),
getAggregatorSpecs(),
getPostAggregatorSpecs(),
getHavingSpec(),
getLimitSpec(),
limitFn,
getContext()
);
}
@Override
public Query<Row> withDataSource(DataSource dataSource)
{

View File

@ -175,14 +175,16 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
// subsequent ones) and return an error if the aggregator types are different.
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
for (final AggregatorFactory transferAgg : aggregatorFactory.getRequiredColumns()) {
if (Iterables.any(aggs, new Predicate<AggregatorFactory>() {
if (Iterables.any(aggs, new Predicate<AggregatorFactory>()
{
@Override
public boolean apply(AggregatorFactory agg) {
public boolean apply(AggregatorFactory agg)
{
return agg.getName().equals(transferAgg.getName()) && !agg.equals(transferAgg);
}
})) {
throw new IAE("Inner aggregator can currently only be referenced by a single type of outer aggregator" +
" for '%s'", transferAgg.getName());
" for '%s'", transferAgg.getName());
}
aggs.add(transferAgg);
@ -417,10 +419,14 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return runner.run(query, responseContext);
}
GroupByQuery groupByQuery = (GroupByQuery) query;
if (groupByQuery.getDimFilter() != null){
groupByQuery = groupByQuery.withDimFilter(groupByQuery.getDimFilter().optimize());
}
final GroupByQuery delegateGroupByQuery = groupByQuery;
ArrayList<DimensionSpec> dimensionSpecs = new ArrayList<>();
Set<String> optimizedDimensions = ImmutableSet.copyOf(
Iterables.transform(
extractionsToRewrite(groupByQuery),
extractionsToRewrite(delegateGroupByQuery),
new Function<DimensionSpec, String>()
{
@Override
@ -431,7 +437,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
)
);
for (DimensionSpec dimensionSpec : groupByQuery.getDimensions()) {
for (DimensionSpec dimensionSpec : delegateGroupByQuery.getDimensions()) {
if (optimizedDimensions.contains(dimensionSpec.getDimension())) {
dimensionSpecs.add(
new DefaultDimensionSpec(dimensionSpec.getDimension(), dimensionSpec.getOutputName())
@ -441,7 +447,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
}
}
return runner.run(
groupByQuery.withDimensionSpecs(dimensionSpecs),
delegateGroupByQuery.withDimensionSpecs(dimensionSpecs),
responseContext
);
}

View File

@ -241,6 +241,22 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
);
}
public TopNQuery withDimFilter(DimFilter dimFilter)
{
return new TopNQuery(
getDataSource(),
getDimensionSpec(),
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
@Override
public String toString()
{

View File

@ -330,7 +330,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return ByteBuffer
.allocate(
1 + dimensionSpecBytes.length + metricSpecBytes.length + 4 +
granularityBytes.length + filterBytes.length + aggregatorBytes.length
granularityBytes.length + filterBytes.length + aggregatorBytes.length
)
.put(TOPN_QUERY)
.put(dimensionSpecBytes)
@ -440,11 +440,15 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
if (!(query instanceof TopNQuery)) {
return runner.run(query, responseContext);
} else {
final TopNQuery topNQuery = (TopNQuery) query;
if (TopNQueryEngine.canApplyExtractionInPost(topNQuery)) {
final DimensionSpec dimensionSpec = topNQuery.getDimensionSpec();
TopNQuery topNQuery = (TopNQuery) query;
if (topNQuery.getDimensionsFilter() != null) {
topNQuery = topNQuery.withDimFilter(topNQuery.getDimensionsFilter().optimize());
}
final TopNQuery delegateTopNQuery = topNQuery;
if (TopNQueryEngine.canApplyExtractionInPost(delegateTopNQuery)) {
final DimensionSpec dimensionSpec = delegateTopNQuery.getDimensionSpec();
return runner.run(
topNQuery.withDimensionSpec(
delegateTopNQuery.withDimensionSpec(
new DefaultDimensionSpec(
dimensionSpec.getDimension(),
dimensionSpec.getOutputName()
@ -452,7 +456,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
), responseContext
);
} else {
return runner.run(query, responseContext);
return runner.run(delegateTopNQuery, responseContext);
}
}
}
@ -499,7 +503,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
{
@Override
public DimensionAndMetricValueExtractor apply(
DimensionAndMetricValueExtractor input
DimensionAndMetricValueExtractor input
)
{
String dimOutputName = topNQuery.getDimensionSpec().getOutputName();

View File

@ -37,6 +37,7 @@ public class LookupExtractionFnExpectationsTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
true,
null,
false,
false
);
Assert.assertNull(lookupExtractionFn.apply(null));
@ -49,6 +50,7 @@ public class LookupExtractionFnExpectationsTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
false,
"REPLACE",
false,
false
);
Assert.assertEquals("REPLACE", lookupExtractionFn.apply(null));
@ -61,6 +63,7 @@ public class LookupExtractionFnExpectationsTest
new MapLookupExtractor(ImmutableMap.of("", "bar")),
false,
"REPLACE",
false,
false
);
Assert.assertEquals("bar", lookupExtractionFn.apply(null));
@ -73,6 +76,7 @@ public class LookupExtractionFnExpectationsTest
new MapLookupExtractor(ImmutableMap.of("foo", "")),
false,
"REPLACE",
false,
false
);
Assert.assertEquals("REPLACE", lookupExtractionFn.apply(null));

View File

@ -90,13 +90,15 @@ public class LookupExtractionFnTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
replaceMissing,
injective
injective,
false
);
final LookupExtractionFn lookupExtractionFn2 = new LookupExtractionFn(
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
replaceMissing,
injective
injective,
false
);
@ -104,7 +106,8 @@ public class LookupExtractionFnTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar2")),
retainMissing,
replaceMissing,
injective
injective,
false
);
Assert.assertEquals(lookupExtractionFn1, lookupExtractionFn2);
@ -124,7 +127,8 @@ public class LookupExtractionFnTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
replaceMissing,
injective
injective,
false
);
final String str1 = OBJECT_MAPPER.writeValueAsString(lookupExtractionFn);
@ -150,7 +154,8 @@ public class LookupExtractionFnTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
Strings.emptyToNull(replaceMissing),
injective
injective,
false
);
} else {
throw new IAE("Case not valid");
@ -171,7 +176,8 @@ public class LookupExtractionFnTest
new MapLookupExtractor(ImmutableMap.of("foo", "bar")),
retainMissing,
replaceMissing,
injective
injective,
false
);
if (Strings.isNullOrEmpty(replaceMissing) || retainMissing) {
@ -182,7 +188,8 @@ public class LookupExtractionFnTest
lookupExtractionFn.getLookup(),
!lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
lookupExtractionFn.isInjective()
lookupExtractionFn.isInjective(),
false
).getCacheKey()
)
);
@ -193,7 +200,8 @@ public class LookupExtractionFnTest
lookupExtractionFn.getLookup(),
!lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
!lookupExtractionFn.isInjective()
!lookupExtractionFn.isInjective(),
false
).getCacheKey()
)
);
@ -205,7 +213,8 @@ public class LookupExtractionFnTest
new MapLookupExtractor(weirdMap),
lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
lookupExtractionFn.isInjective()
lookupExtractionFn.isInjective(),
false
).getCacheKey()
)
);
@ -216,7 +225,8 @@ public class LookupExtractionFnTest
lookupExtractionFn.getLookup(),
lookupExtractionFn.isRetainMissingValue(),
lookupExtractionFn.getReplaceMissingValueWith(),
!lookupExtractionFn.isInjective()
!lookupExtractionFn.isInjective(),
false
).getCacheKey()
)
);

View File

@ -268,7 +268,7 @@ public class GroupByQueryRunnerTest
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), false, null, false), null
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), false, null, false, false), null
)
)
)
@ -344,7 +344,7 @@ public class GroupByQueryRunnerTest
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), true, null, false), null
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), true, null, false, false), null
)
)
)
@ -420,7 +420,7 @@ public class GroupByQueryRunnerTest
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), true, null, true), null
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), true, null, true, false), null
)
)
)
@ -498,7 +498,7 @@ public class GroupByQueryRunnerTest
new ExtractionDimensionSpec(
"quality",
"alias",
new LookupExtractionFn(new MapLookupExtractor(map), false, "MISSING", true),
new LookupExtractionFn(new MapLookupExtractor(map), false, "MISSING", true, false),
null
)
)
@ -574,7 +574,7 @@ public class GroupByQueryRunnerTest
.setDimensions(
Lists.<DimensionSpec>newArrayList(
new ExtractionDimensionSpec(
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), false, null, true), null
"quality", "alias", new LookupExtractionFn(new MapLookupExtractor(map), false, null, true, false), null
)
)
)
@ -3923,7 +3923,8 @@ public class GroupByQueryRunnerTest
"mezzanine",
"mezzanine0"
)
), false, null, false
), false, null, false,
false
),
null
)
@ -3996,7 +3997,8 @@ public class GroupByQueryRunnerTest
"mezzanine",
"mezzanine0"
)
), false, null, true
), false, null, true,
false
),
null
)
@ -4042,7 +4044,7 @@ public class GroupByQueryRunnerTest
extractionMap.put("news", "automotiveAndBusinessAndNewsAndMezzanine");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, false);
List<DimFilter> dimFilters = Lists.<DimFilter>newArrayList(
new ExtractionDimFilter("quality", "automotiveAndBusinessAndNewsAndMezzanine", lookupExtractionFn, null),
@ -4114,7 +4116,7 @@ public class GroupByQueryRunnerTest
extractionMap.put("travel", "travel0");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, false);
GroupByQuery query = GroupByQuery.builder().setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(
@ -4150,7 +4152,7 @@ public class GroupByQueryRunnerTest
{
Map<String, String> extractionMap = new HashMap<>();
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, false);
GroupByQuery query = GroupByQuery.builder().setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
@ -4191,7 +4193,7 @@ public class GroupByQueryRunnerTest
extractionMap.put("", "NULLorEMPTY");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, false);
GroupByQuery query = GroupByQuery.builder().setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
@ -4243,7 +4245,7 @@ public class GroupByQueryRunnerTest
extractionMap.put("travel", "travel0");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, "missing", true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, "missing", true, false);
DimFilter filter = new ExtractionDimFilter("quality","mezzanineANDnews",lookupExtractionFn,null);
GroupByQuery query = GroupByQuery.builder().setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
@ -4295,4 +4297,54 @@ public class GroupByQueryRunnerTest
}
@Test
public void testGroupByWithExtractionDimFilterOptimazitionManyToOne()
{
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("mezzanine", "newsANDmezzanine");
extractionMap.put("news", "newsANDmezzanine");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true);
GroupByQuery query = GroupByQuery.builder().setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")))
.setGranularity(QueryRunnerTestHelper.dayGran)
.setDimFilter(new ExtractionDimFilter("quality", "newsANDmezzanine", lookupExtractionFn, null))
.build();
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L));
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test public void testGroupByWithExtractionDimFilterNullDims()
{
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("", "EMPTY");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true);
GroupByQuery query = GroupByQuery.builder().setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("null_column", "alias")))
.setAggregatorSpecs(
Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index")))
.setGranularity(QueryRunnerTestHelper.dayGran)
.setDimFilter(new ExtractionDimFilter("null_column", "EMPTY", lookupExtractionFn, null)).build();
List<Row> expectedResults = Arrays
.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", null, "rows", 13L, "idx", 6619L),
GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", null, "rows", 13L, "idx", 5827L));
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
}

View File

@ -257,7 +257,8 @@ public class SearchQueryRunnerTest
new MapLookupExtractor(ImmutableMap.of("automotive", automotiveSnowman)),
true,
null,
true
true,
false
);
checkSearchQuery(

View File

@ -1630,7 +1630,8 @@ public class TopNQueryRunnerTest
"total_market", "1total_market0",
"upfront", "3upfront0"
)
), false, "MISSING", true
), false, "MISSING", true,
false
),
null
)
@ -1693,7 +1694,8 @@ public class TopNQueryRunnerTest
"total_market", "1total_market0",
"upfront", "3upfront0"
)
), false, "MISSING", false
), false, "MISSING", false,
false
),
null
)
@ -1757,7 +1759,8 @@ public class TopNQueryRunnerTest
"total_market", "1total_market0",
"upfront", "3upfront0"
)
), true, null, true
), true, null, true,
false
),
null
)
@ -1823,7 +1826,8 @@ public class TopNQueryRunnerTest
"upfront",
"upfront0"
)
), true, null, false
), true, null, false,
false
),
null
)
@ -1888,7 +1892,8 @@ public class TopNQueryRunnerTest
"upfront",
"1upfront"
)
), true, null, true
), true, null, true,
false
),
null
)
@ -1953,7 +1958,8 @@ public class TopNQueryRunnerTest
"upfront",
"1upfront"
)
), true, null, false
), true, null, false,
false
),
null
)
@ -2019,7 +2025,8 @@ public class TopNQueryRunnerTest
"upfront",
"1upfront"
)
), true, null, true
), true, null, true,
false
),
null
)
@ -3159,7 +3166,7 @@ public class TopNQueryRunnerTest
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("spot", "spot0");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, false);
TopNQuery query = new TopNQueryBuilder().dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
@ -3197,6 +3204,9 @@ public class TopNQueryRunnerTest
);
assertExpectedResults(expectedResults, query);
// Assert the optimization path as well
final Sequence<Result<TopNResultValue>> retval = runWithPreMergeAndMerge(query);
TestHelper.assertExpectedResults(expectedResults, retval);
}
@Test
@ -3206,7 +3216,7 @@ public class TopNQueryRunnerTest
extractionMap.put("", "NULL");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, false);
DimFilter extractionFilter = new ExtractionDimFilter("null_column", "NULL", lookupExtractionFn, null);
TopNQueryBuilder topNQueryBuilder = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
@ -3254,4 +3264,67 @@ public class TopNQueryRunnerTest
assertExpectedResults(expectedResults, topNQueryWithNULLValueExtraction);
}
private Sequence<Result<TopNResultValue>> runWithPreMergeAndMerge(TopNQuery query){
return runWithPreMergeAndMerge(query, ImmutableMap.<String, Object>of());
}
private Sequence<Result<TopNResultValue>> runWithPreMergeAndMerge(TopNQuery query, Map<String, Object> context)
{
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
final QueryRunner<Result<TopNResultValue>> Runner = chest.mergeResults(chest.preMergeQueryDecoration(runner));
return Runner.run(query, context);
}
@Test
public void testTopNWithExtractionFilterNoExistingValue()
{
Map<String, String> extractionMap = new HashMap<>();
extractionMap.put("","NULL");
MapLookupExtractor mapLookupExtractor = new MapLookupExtractor(extractionMap);
LookupExtractionFn lookupExtractionFn = new LookupExtractionFn(mapLookupExtractor, false, null, true, true);
DimFilter extractionFilter = new ExtractionDimFilter("null_column", "NULL", lookupExtractionFn, null);
TopNQueryBuilder topNQueryBuilder = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.dimension("null_column")
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(4)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(Lists.newArrayList(Iterables.concat(QueryRunnerTestHelper.commonAggregators, Lists.newArrayList(
new FilteredAggregatorFactory(new DoubleMaxAggregatorFactory("maxIndex", "index"),
extractionFilter),
//new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")))))
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant));
TopNQuery topNQueryWithNULLValueExtraction = topNQueryBuilder
.filters(extractionFilter)
.build();
Map<String, Object> map = Maps.newHashMap();
map.put("null_column", null);
map.put("rows", 1209L);
map.put("index", 503332.5071372986D);
map.put("addRowsIndexConstant", 504542.5071372986D);
map.put("uniques", QueryRunnerTestHelper.UNIQUES_9);
map.put("maxIndex", 1870.06103515625D);
map.put("minIndex", 59.02102279663086D);
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
new DateTime("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(
map
)
)
)
);
assertExpectedResults(expectedResults, topNQueryWithNULLValueExtraction);
// Assert the optimization path as well
final Sequence<Result<TopNResultValue>> retval = runWithPreMergeAndMerge(topNQueryWithNULLValueExtraction);
TestHelper.assertExpectedResults(expectedResults, retval);
}
}

View File

@ -93,7 +93,7 @@ public class TopNQueryTest
new ExtractionDimensionSpec(
marketDimension,
marketDimension,
new LookupExtractionFn(new MapLookupExtractor(ImmutableMap.of("foo", "bar")), true, null, false),
new LookupExtractionFn(new MapLookupExtractor(ImmutableMap.of("foo", "bar")), true, null, false, false),
null
)
)