mirror of https://github.com/apache/druid.git
Add support for filtering at DimensionSpec level so that multivalued dimensions can be filtered correctly
also adding UTs for multi-valued dimensions
This commit is contained in:
parent
fa5c3bb014
commit
b47d807738
|
@ -252,3 +252,23 @@ A null dimension value can be mapped to a specific value by specifying the empty
|
|||
This allows distinguishing between a null dimension and a lookup resulting in a null.
|
||||
For example, specifying `{"":"bar","bat":"baz"}` with dimension values `[null, "foo", "bat"]` and replacing missing values with `"oof"` will yield results of `["bar", "oof", "baz"]`.
|
||||
Omitting the empty string key will cause the missing value to take over. For example, specifying `{"bat":"baz"}` with dimension values `[null, "foo", "bat"]` and replacing missing values with `"oof"` will yield results of `["oof", "oof", "baz"]`.
|
||||
|
||||
### Filtering DimensionSpecs
|
||||
These are only valid for multi-valued dimensions. If you have a row in druid that has a multi-valued dimension with values ["v1", "v2", "v3"] and you send a groupBy/topN query grouping by that dimension with [query filter](filter.html) for value "v1". In the response you will get 3 rows containing "v1", "v2" and "v3". This behavior might be unintuitive for some use cases.
|
||||
|
||||
It happens because `query filter` is internally used on the bitmaps and only used to match the row to be included in the query result processing. With multivalued dimensions, "query filter" behaves like a contains check, which will match the row with dimension value ["v1", "v2", "v3"]. Please see the section on "Multi-value columns" in [segment](../design/segments.html) for more details.
|
||||
Then groupBy/topN processing pipeline "explodes" all multi-valued dimensions resulting 3 rows for "v1", "v2" and "v3" each.
|
||||
|
||||
In addition to "query filter" which efficiently selects the rows to be processed, you can use the filtering dimension spec to filter for specific values within the values of a multi-valued dimension. These dimensionSpecs take a delegate DimensionSpec and a filtering criteria. From the "exploded" rows, only rows matching the given filtering criteria are returned in the query result.
|
||||
|
||||
The following filtered dimension spec acts as a whiltelist or blacklist for values as per the "isWhitelist" attribute value.
|
||||
```json
|
||||
{ "type" : "listFiltered", "delegate" : <dimensionSpec>, "values": <array of strings>, "isWhitelist": <optional attribute for true/false, default is true> }
|
||||
```
|
||||
|
||||
Following filtered dimension spec retains only the values matching regex. Note that `listFiltered` is faster than this and one should use that for whitelist or blacklist usecase.
|
||||
```json
|
||||
{ "type" : "regexFiltered", "delegate" : <dimensionSpec>, "pattern": <java regex pattern> }
|
||||
```
|
||||
|
||||
For more details and examples, see [multi-valued dimensions](multi-valued-dimensions.html).
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.dimension;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.query.extraction.ExtractionFn;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class BaseFilteredDimensionSpec implements DimensionSpec
|
||||
{
|
||||
protected final DimensionSpec delegate;
|
||||
|
||||
public BaseFilteredDimensionSpec(
|
||||
@JsonProperty("delegate") DimensionSpec delegate
|
||||
)
|
||||
{
|
||||
this.delegate = Preconditions.checkNotNull(delegate, "delegate must not be null");
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DimensionSpec getDelegate()
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDimension()
|
||||
{
|
||||
return delegate.getDimension();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getOutputName()
|
||||
{
|
||||
return delegate.getOutputName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtractionFn getExtractionFn()
|
||||
{
|
||||
return delegate.getExtractionFn();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean preservesOrdering()
|
||||
{
|
||||
return delegate.preservesOrdering();
|
||||
}
|
||||
}
|
|
@ -29,7 +29,9 @@ import io.druid.segment.DimensionSelector;
|
|||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyDimensionSpec.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "default", value = DefaultDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "extraction", value = ExtractionDimensionSpec.class)
|
||||
@JsonSubTypes.Type(name = "extraction", value = ExtractionDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "regexFiltered", value = RegexFilteredDimensionSpec.class),
|
||||
@JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class)
|
||||
})
|
||||
public interface DimensionSpec
|
||||
{
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.dimension;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.metamx.common.StringUtils;
|
||||
import io.druid.query.filter.DimFilterCacheHelper;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.ListBasedIndexedInts;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ListFilteredDimensionSpec extends BaseFilteredDimensionSpec
|
||||
{
|
||||
|
||||
private static final byte CACHE_TYPE_ID = 0x3;
|
||||
|
||||
private final List<String> values;
|
||||
private final boolean isWhitelist;
|
||||
|
||||
public ListFilteredDimensionSpec(
|
||||
@JsonProperty("delegate") DimensionSpec delegate,
|
||||
@JsonProperty("values") List<String> values,
|
||||
@JsonProperty("isWhitelist") Boolean isWhitelist
|
||||
)
|
||||
{
|
||||
super(delegate);
|
||||
|
||||
Preconditions.checkArgument(values != null && values.size() > 0, "values list must be non-empty");
|
||||
this.values = values;
|
||||
|
||||
this.isWhitelist = isWhitelist == null ? true : isWhitelist.booleanValue();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<String> getValues()
|
||||
{
|
||||
return values;
|
||||
}
|
||||
|
||||
@JsonProperty("isWhitelist")
|
||||
public boolean isWhitelist()
|
||||
{
|
||||
return isWhitelist;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector decorate(final DimensionSelector selector)
|
||||
{
|
||||
if (selector == null) {
|
||||
return selector;
|
||||
}
|
||||
|
||||
final Set<Integer> matched = new HashSet<>(values.size());
|
||||
for (String value : values) {
|
||||
int i = selector.lookupId(value);
|
||||
if (i >= 0) {
|
||||
matched.add(i);
|
||||
}
|
||||
};
|
||||
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
IndexedInts baseRow = selector.getRow();
|
||||
List<Integer> result = new ArrayList<>(baseRow.size());
|
||||
|
||||
for (int i : baseRow) {
|
||||
if (matched.contains(i)) {
|
||||
if (isWhitelist) {
|
||||
result.add(i);
|
||||
}
|
||||
} else {
|
||||
if (!isWhitelist) {
|
||||
result.add(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new ListBasedIndexedInts(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return matched.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return selector.lookupName(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return selector.lookupId(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] delegateCacheKey = delegate.getCacheKey();
|
||||
|
||||
byte[][] valuesBytes = new byte[values.size()][];
|
||||
int valuesBytesSize = 0;
|
||||
int index = 0;
|
||||
for (String value : values) {
|
||||
valuesBytes[index] = StringUtils.toUtf8(value);
|
||||
valuesBytesSize += valuesBytes[index].length + 1;
|
||||
++index;
|
||||
}
|
||||
|
||||
ByteBuffer filterCacheKey = ByteBuffer.allocate(3 + delegateCacheKey.length + valuesBytesSize)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(delegateCacheKey)
|
||||
.put((byte) (isWhitelist ? 1 : 0))
|
||||
.put(DimFilterCacheHelper.STRING_SEPARATOR);
|
||||
for (byte[] bytes : valuesBytes) {
|
||||
filterCacheKey.put(bytes)
|
||||
.put(DimFilterCacheHelper.STRING_SEPARATOR);
|
||||
}
|
||||
return filterCacheKey.array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ListFilteredDimensionSpec that = (ListFilteredDimensionSpec) o;
|
||||
|
||||
if (isWhitelist != that.isWhitelist) {
|
||||
return false;
|
||||
}
|
||||
return values.equals(that.values);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = values.hashCode();
|
||||
result = 31 * result + (isWhitelist ? 1 : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ListFilteredDimensionSpec{" +
|
||||
"values=" + values +
|
||||
", isWhitelist=" + isWhitelist +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.dimension;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.metamx.common.StringUtils;
|
||||
import io.druid.query.filter.DimFilterCacheHelper;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import io.druid.segment.data.ListBasedIndexedInts;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RegexFilteredDimensionSpec extends BaseFilteredDimensionSpec
|
||||
{
|
||||
|
||||
private static final byte CACHE_TYPE_ID = 0x2;
|
||||
|
||||
private final String pattern;
|
||||
|
||||
private final Pattern compiledRegex;
|
||||
|
||||
public RegexFilteredDimensionSpec(
|
||||
@JsonProperty("delegate") DimensionSpec delegate,
|
||||
@JsonProperty("pattern") String pattern //rows not matching the pattern will be discarded
|
||||
)
|
||||
{
|
||||
super(delegate);
|
||||
this.pattern = Preconditions.checkNotNull(pattern, "pattern must not be null");
|
||||
this.compiledRegex = Pattern.compile(pattern);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getPattern()
|
||||
{
|
||||
return pattern;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector decorate(final DimensionSelector selector)
|
||||
{
|
||||
if (selector == null) {
|
||||
return selector;
|
||||
}
|
||||
|
||||
final BitSet bitSetOfIds = new BitSet(selector.getValueCardinality());
|
||||
for (int i = 0; i < selector.getValueCardinality(); i++) {
|
||||
if (compiledRegex.matcher(Strings.nullToEmpty(selector.lookupName(i))).matches()) {
|
||||
bitSetOfIds.set(i);
|
||||
}
|
||||
}
|
||||
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
IndexedInts baseRow = selector.getRow();
|
||||
List<Integer> result = new ArrayList<>(baseRow.size());
|
||||
|
||||
for (int i : baseRow) {
|
||||
if (bitSetOfIds.get(i)) {
|
||||
result.add(i);
|
||||
}
|
||||
}
|
||||
|
||||
return new ListBasedIndexedInts(result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return bitSetOfIds.cardinality();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
return selector.lookupName(id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
return selector.lookupId(name);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] delegateCacheKey = delegate.getCacheKey();
|
||||
byte[] regexBytes = StringUtils.toUtf8(pattern);
|
||||
return ByteBuffer.allocate(2 + delegateCacheKey.length + regexBytes.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(delegateCacheKey)
|
||||
.put(DimFilterCacheHelper.STRING_SEPARATOR)
|
||||
.put(regexBytes)
|
||||
.array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
RegexFilteredDimensionSpec that = (RegexFilteredDimensionSpec) o;
|
||||
|
||||
if (!delegate.equals(that.delegate)) {
|
||||
return false;
|
||||
}
|
||||
return pattern.equals(that.pattern);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = delegate.hashCode();
|
||||
result = 31 * result + pattern.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "RegexFilteredDimensionSpec{" +
|
||||
"pattern='" + pattern + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -24,7 +24,7 @@ import java.util.List;
|
|||
|
||||
/**
|
||||
*/
|
||||
class DimFilterCacheHelper
|
||||
public class DimFilterCacheHelper
|
||||
{
|
||||
static final byte NOOP_CACHE_ID = -0x4;
|
||||
static final byte SELECTOR_CACHE_ID = 0x0;
|
||||
|
@ -37,7 +37,7 @@ class DimFilterCacheHelper
|
|||
static final byte JAVASCRIPT_CACHE_ID = 0x7;
|
||||
static final byte SPATIAL_CACHE_ID = 0x8;
|
||||
static final byte IN_CACHE_ID = 0x9;
|
||||
static final byte STRING_SEPARATOR = (byte) 0xFF;
|
||||
public static final byte STRING_SEPARATOR = (byte) 0xFF;
|
||||
public static byte BOUND_CACHE_ID = 0xA;
|
||||
|
||||
static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters)
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.data;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ListBasedIndexedInts implements IndexedInts
|
||||
{
|
||||
private final List<Integer> expansion;
|
||||
|
||||
public ListBasedIndexedInts(List<Integer> expansion) {this.expansion = expansion;}
|
||||
|
||||
@Override
|
||||
public int size()
|
||||
{
|
||||
return expansion.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int get(int index)
|
||||
{
|
||||
return expansion.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<Integer> iterator()
|
||||
{
|
||||
return new IndexedIntsIterator(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fill(int index, int[] toFill)
|
||||
{
|
||||
throw new UnsupportedOperationException("fill not supported");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,260 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.Files;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.impl.CSVParseSpec;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregationTestHelper;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.dimension.RegexFilteredDimensionSpec;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
|
||||
import io.druid.query.spec.LegacySegmentSpec;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.IndexSpec;
|
||||
import io.druid.segment.QueryableIndex;
|
||||
import io.druid.segment.QueryableIndexSegment;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MultiValuedDimensionTest
|
||||
{
|
||||
private AggregationTestHelper helper;
|
||||
|
||||
private static IncrementalIndex incrementalIndex;
|
||||
private static QueryableIndex queryableIndex;
|
||||
|
||||
private static File persistedSegmentDir;
|
||||
|
||||
public MultiValuedDimensionTest() throws Exception
|
||||
{
|
||||
helper = new AggregationTestHelper(
|
||||
ImmutableList.<Module>of(), null
|
||||
);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void setupClass() throws Exception
|
||||
{
|
||||
incrementalIndex = new OnheapIncrementalIndex(
|
||||
0,
|
||||
QueryGranularity.NONE,
|
||||
new AggregatorFactory[]{
|
||||
new CountAggregatorFactory("count")
|
||||
},
|
||||
true,
|
||||
5000
|
||||
);
|
||||
|
||||
StringInputRowParser parser = new StringInputRowParser(
|
||||
new CSVParseSpec(
|
||||
new TimestampSpec("timestamp", "iso", null),
|
||||
new DimensionsSpec(ImmutableList.of("product", "tags"), null, null),
|
||||
"\t",
|
||||
ImmutableList.of("timestamp", "product", "tags")
|
||||
),
|
||||
"UTF-8"
|
||||
);
|
||||
|
||||
String[] rows = new String[]{
|
||||
"2011-01-12T00:00:00.000Z,product_1,t1\tt2\tt3",
|
||||
"2011-01-13T00:00:00.000Z,product_2,t3\tt4\tt5",
|
||||
"2011-01-14T00:00:00.000Z,product_3,t5\tt6\tt7",
|
||||
};
|
||||
|
||||
for (String row : rows) {
|
||||
incrementalIndex.add(parser.parse(row));
|
||||
}
|
||||
|
||||
persistedSegmentDir = Files.createTempDir();
|
||||
TestHelper.getTestIndexMerger()
|
||||
.persist(incrementalIndex, persistedSegmentDir, ImmutableMap.<String, Object>of(), new IndexSpec());
|
||||
|
||||
queryableIndex = TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByNoFilter() throws Exception
|
||||
{
|
||||
GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource("xx")
|
||||
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
|
||||
.setGranularity(QueryGranularity.ALL)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("tags", "tags")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
new AggregatorFactory[]
|
||||
{
|
||||
new CountAggregatorFactory("count")
|
||||
}
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||
ImmutableList.of(
|
||||
new QueryableIndexSegment("sid1", queryableIndex),
|
||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||
),
|
||||
query
|
||||
);
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t1", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t2", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t3", "count", 4L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t4", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t5", "count", 4L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t6", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t7", "count", 2L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, Sequences.toList(result, new ArrayList<Row>()), "");
|
||||
|
||||
result = helper.runQueryOnSegmentsObjs(
|
||||
ImmutableList.of(
|
||||
new QueryableIndexSegment("sid1", queryableIndex),
|
||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||
),
|
||||
query
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithDimFilter() throws Exception
|
||||
{
|
||||
GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource("xx")
|
||||
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
|
||||
.setGranularity(QueryGranularity.ALL)
|
||||
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("tags", "tags")))
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
new AggregatorFactory[]
|
||||
{
|
||||
new CountAggregatorFactory("count")
|
||||
}
|
||||
)
|
||||
)
|
||||
.setDimFilter(
|
||||
new SelectorDimFilter("tags", "t3")
|
||||
)
|
||||
.build();
|
||||
|
||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||
ImmutableList.of(
|
||||
new QueryableIndexSegment("sid1", queryableIndex),
|
||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||
),
|
||||
query
|
||||
);
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t1", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t2", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t3", "count", 4L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t4", "count", 2L),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t5", "count", 2L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, Sequences.toList(result, new ArrayList<Row>()), "");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupByWithDimFilterAndWithFilteredDimSpec() throws Exception
|
||||
{
|
||||
GroupByQuery query = GroupByQuery
|
||||
.builder()
|
||||
.setDataSource("xx")
|
||||
.setQuerySegmentSpec(new LegacySegmentSpec("1970/3000"))
|
||||
.setGranularity(QueryGranularity.ALL)
|
||||
.setDimensions(
|
||||
Lists.<DimensionSpec>newArrayList(
|
||||
new RegexFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("tags", "tags"),
|
||||
"t3"
|
||||
)
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
Arrays.asList(
|
||||
new AggregatorFactory[]
|
||||
{
|
||||
new CountAggregatorFactory("count")
|
||||
}
|
||||
)
|
||||
)
|
||||
.setDimFilter(
|
||||
new SelectorDimFilter("tags", "t3")
|
||||
)
|
||||
.build();
|
||||
|
||||
Sequence<Row> result = helper.runQueryOnSegmentsObjs(
|
||||
ImmutableList.of(
|
||||
new QueryableIndexSegment("sid1", queryableIndex),
|
||||
new IncrementalIndexSegment(incrementalIndex, "sid2")
|
||||
),
|
||||
query
|
||||
);
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow("1970-01-01T00:00:00.000Z", "tags", "t3", "count", 4L)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedObjects(expectedResults, Sequences.toList(result, new ArrayList<Row>()), "");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws Exception
|
||||
{
|
||||
queryableIndex.close();
|
||||
incrementalIndex.close();
|
||||
FileUtils.deleteDirectory(persistedSegmentDir);
|
||||
}
|
||||
}
|
|
@ -296,12 +296,12 @@ public class AggregationTestHelper
|
|||
|
||||
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final GroupByQuery query)
|
||||
{
|
||||
final List<QueryableIndexSegment> segments = Lists.transform(
|
||||
final List<Segment> segments = Lists.transform(
|
||||
segmentDirs,
|
||||
new Function<File, QueryableIndexSegment>()
|
||||
new Function<File, Segment>()
|
||||
{
|
||||
@Override
|
||||
public QueryableIndexSegment apply(File segmentDir)
|
||||
public Segment apply(File segmentDir)
|
||||
{
|
||||
try {
|
||||
return new QueryableIndexSegment("", indexIO.loadIndex(segmentDir));
|
||||
|
@ -314,48 +314,52 @@ public class AggregationTestHelper
|
|||
);
|
||||
|
||||
try {
|
||||
final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner(
|
||||
toolChest.postMergeQueryDecoration(
|
||||
toolChest.mergeResults(
|
||||
toolChest.preMergeQueryDecoration(
|
||||
new ConcatQueryRunner(
|
||||
Sequences.simple(
|
||||
Lists.transform(
|
||||
segments,
|
||||
new Function<Segment, QueryRunner>()
|
||||
{
|
||||
@Override
|
||||
public QueryRunner apply(final Segment segment)
|
||||
{
|
||||
try {
|
||||
return makeStringSerdeQueryRunner(
|
||||
mapper,
|
||||
toolChest,
|
||||
query,
|
||||
factory.createRunner(segment)
|
||||
);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw Throwables.propagate(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
toolChest
|
||||
);
|
||||
|
||||
return baseRunner.run(query, Maps.newHashMap());
|
||||
return runQueryOnSegmentsObjs(segments, query);
|
||||
} finally {
|
||||
for(Segment segment: segments) {
|
||||
CloseQuietly.close(segment);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Sequence<Row> runQueryOnSegmentsObjs(final List<Segment> segments, final GroupByQuery query)
|
||||
{
|
||||
final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner(
|
||||
toolChest.postMergeQueryDecoration(
|
||||
toolChest.mergeResults(
|
||||
toolChest.preMergeQueryDecoration(
|
||||
new ConcatQueryRunner(
|
||||
Sequences.simple(
|
||||
Lists.transform(
|
||||
segments,
|
||||
new Function<Segment, QueryRunner>()
|
||||
{
|
||||
@Override
|
||||
public QueryRunner apply(final Segment segment)
|
||||
{
|
||||
try {
|
||||
return makeStringSerdeQueryRunner(
|
||||
mapper,
|
||||
toolChest,
|
||||
query,
|
||||
factory.createRunner(segment)
|
||||
);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw Throwables.propagate(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
toolChest
|
||||
);
|
||||
|
||||
return baseRunner.run(query, Maps.newHashMap());
|
||||
}
|
||||
|
||||
public QueryRunner<Row> makeStringSerdeQueryRunner(final ObjectMapper mapper, final QueryToolChest toolChest, final Query<Row> query, final QueryRunner<Row> baseRunner)
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.dimension;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class ListFilteredDimensionSpecTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
|
||||
//isWhitelist = true
|
||||
String jsonStr = "{\n"
|
||||
+ " \"type\": \"listFiltered\",\n"
|
||||
+ " \"delegate\": {\n"
|
||||
+ " \"type\": \"default\",\n"
|
||||
+ " \"dimension\": \"foo\",\n"
|
||||
+ " \"outputName\": \"bar\"\n"
|
||||
+ " },\n"
|
||||
+ " \"values\": [\"xxx\"]\n"
|
||||
+ "}";
|
||||
|
||||
ListFilteredDimensionSpec actual = (ListFilteredDimensionSpec) mapper.readValue(
|
||||
mapper.writeValueAsString(mapper.readValue(jsonStr, DimensionSpec.class)),
|
||||
DimensionSpec.class);
|
||||
|
||||
ListFilteredDimensionSpec expected = new ListFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
ImmutableList.of("xxx"),
|
||||
true
|
||||
);
|
||||
|
||||
Assert.assertEquals(expected, actual);
|
||||
|
||||
//isWhitelist = false
|
||||
jsonStr = "{\n"
|
||||
+ " \"type\": \"listFiltered\",\n"
|
||||
+ " \"delegate\": {\n"
|
||||
+ " \"type\": \"default\",\n"
|
||||
+ " \"dimension\": \"foo\",\n"
|
||||
+ " \"outputName\": \"bar\"\n"
|
||||
+ " },\n"
|
||||
+ " \"values\": [\"xxx\"],\n"
|
||||
+ " \"isWhitelist\": false\n"
|
||||
+ "}";
|
||||
|
||||
actual = (ListFilteredDimensionSpec) mapper.readValue(
|
||||
mapper.writeValueAsString(mapper.readValue(jsonStr, DimensionSpec.class)),
|
||||
DimensionSpec.class);
|
||||
|
||||
expected = new ListFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
ImmutableList.of("xxx"),
|
||||
false
|
||||
);
|
||||
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCacheKey()
|
||||
{
|
||||
ListFilteredDimensionSpec spec1 = new ListFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
ImmutableList.of("xxx"),
|
||||
null
|
||||
);
|
||||
|
||||
ListFilteredDimensionSpec spec2 = new ListFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
ImmutableList.of("xyz"),
|
||||
null
|
||||
);
|
||||
|
||||
Assert.assertFalse(Arrays.equals(spec1.getCacheKey(), spec2.getCacheKey()));
|
||||
|
||||
ListFilteredDimensionSpec spec3 = new ListFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
ImmutableList.of("xxx"),
|
||||
false
|
||||
);
|
||||
|
||||
Assert.assertFalse(Arrays.equals(spec1.getCacheKey(), spec3.getCacheKey()));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.dimension;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class RegexFilteredDimensionSpecTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
ObjectMapper mapper = TestHelper.getObjectMapper();
|
||||
|
||||
String jsonStr = "{\n"
|
||||
+ " \"type\": \"regexFiltered\",\n"
|
||||
+ " \"delegate\": {\n"
|
||||
+ " \"type\": \"default\",\n"
|
||||
+ " \"dimension\": \"foo\",\n"
|
||||
+ " \"outputName\": \"bar\"\n"
|
||||
+ " },\n"
|
||||
+ " \"pattern\": \"xxx\"\n"
|
||||
+ "}";
|
||||
|
||||
RegexFilteredDimensionSpec actual = (RegexFilteredDimensionSpec) mapper.readValue(
|
||||
mapper.writeValueAsString(mapper.readValue(jsonStr, DimensionSpec.class)),
|
||||
DimensionSpec.class);
|
||||
|
||||
RegexFilteredDimensionSpec expected = new RegexFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
"xxx"
|
||||
);
|
||||
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCacheKey()
|
||||
{
|
||||
RegexFilteredDimensionSpec spec1 = new RegexFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
"xxx"
|
||||
);
|
||||
|
||||
RegexFilteredDimensionSpec spec2 = new RegexFilteredDimensionSpec(
|
||||
new DefaultDimensionSpec("foo", "bar"),
|
||||
"xyz"
|
||||
);
|
||||
|
||||
Assert.assertFalse(Arrays.equals(spec1.getCacheKey(), spec2.getCacheKey()));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue