Filtered Aggregator fixes + enhancements

- fix NPE on IncrementIndex
- refactor code to support AND, OR filter
- tests for AND & OR filter
- handling for missing column / null values
This commit is contained in:
nishantmonu51 2014-11-19 17:42:37 +05:30 committed by Xavier Léauté
parent 2f08ab85fc
commit e3260aa177
16 changed files with 489 additions and 112 deletions

View File

@ -169,14 +169,13 @@ A filtered aggregator wraps any given aggregator, but only aggregates the values
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations. This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
*Limitations:* The filtered aggregator currently only supports selector and not filter with a single selector, i.e. matching a dimension against a single value. *Limitations:* The filtered aggregator currently only supports 'or', 'and', 'selector' and 'not' filters, i.e. matching one or multiple dimensions against a single value.
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data. *Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
```json ```json
{ {
"type" : "filtered", "type" : "filtered",
"name" : "aggMatching",
"filter" : { "filter" : {
"type" : "selector", "type" : "selector",
"dimension" : <dimension>, "dimension" : <dimension>,

View File

@ -19,38 +19,24 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import com.google.common.base.Predicate; import io.druid.query.filter.ValueMatcher;
import com.google.common.collect.Iterables;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
public class FilteredAggregator implements Aggregator public class FilteredAggregator implements Aggregator
{ {
private final String name; private final ValueMatcher matcher;
private final DimensionSelector dimSelector;
private final Aggregator delegate; private final Aggregator delegate;
private final IntPredicate predicate;
public FilteredAggregator(String name, DimensionSelector dimSelector, IntPredicate predicate, Aggregator delegate) public FilteredAggregator(ValueMatcher matcher, Aggregator delegate)
{ {
this.name = name; this.matcher = matcher;
this.dimSelector = dimSelector;
this.delegate = delegate; this.delegate = delegate;
this.predicate = predicate;
} }
@Override @Override
public void aggregate() public void aggregate()
{ {
final IndexedInts row = dimSelector.getRow(); if (matcher.matches()) {
final int size = row.size(); delegate.aggregate();
for (int i = 0; i < size; ++i) {
if (predicate.apply(row.get(i))) {
delegate.aggregate();
break;
}
} }
} }
@ -75,7 +61,7 @@ public class FilteredAggregator implements Aggregator
@Override @Override
public String getName() public String getName()
{ {
return name; return delegate.getName();
} }
@Override @Override

View File

@ -21,13 +21,10 @@ package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.filter.NotDimFilter; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector; import io.druid.segment.filter.Filters;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Comparator; import java.util.Comparator;
@ -37,49 +34,40 @@ public class FilteredAggregatorFactory implements AggregatorFactory
{ {
private static final byte CACHE_TYPE_ID = 0x9; private static final byte CACHE_TYPE_ID = 0x9;
private final String name;
private final AggregatorFactory delegate; private final AggregatorFactory delegate;
private final DimFilter filter; private final DimFilter filter;
public FilteredAggregatorFactory( public FilteredAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("aggregator") AggregatorFactory delegate, @JsonProperty("aggregator") AggregatorFactory delegate,
@JsonProperty("filter") DimFilter filter @JsonProperty("filter") DimFilter filter
) )
{ {
Preconditions.checkNotNull(delegate); Preconditions.checkNotNull(delegate);
Preconditions.checkNotNull(filter); Preconditions.checkNotNull(filter);
Preconditions.checkArgument(
filter instanceof SelectorDimFilter ||
(filter instanceof NotDimFilter && ((NotDimFilter) filter).getField() instanceof SelectorDimFilter),
"FilteredAggregator currently only supports filters of type 'selector' and their negation"
);
this.name = name;
this.delegate = delegate; this.delegate = delegate;
this.filter = filter; this.filter = filter;
} }
@Override @Override
public Aggregator factorize(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory columnSelectorFactory)
{ {
final Aggregator aggregator = delegate.factorize(metricFactory); final ValueMatcher valueMatcher = Filters.convertDimensionFilters(filter).makeMatcher(columnSelectorFactory);
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate( return new FilteredAggregator(
filter, valueMatcher,
metricFactory delegate.factorize(columnSelectorFactory)
); );
return new FilteredAggregator(name, selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
} }
@Override @Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnSelectorFactory)
{ {
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory); final ValueMatcher valueMatcher = Filters.convertDimensionFilters(filter).makeMatcher(columnSelectorFactory);
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate( return new FilteredBufferAggregator(
filter, valueMatcher,
metricFactory delegate.factorizeBuffered(columnSelectorFactory)
); );
return new FilteredBufferAggregator(selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
} }
@Override @Override
@ -116,7 +104,7 @@ public class FilteredAggregatorFactory implements AggregatorFactory
@Override @Override
public String getName() public String getName()
{ {
return name; return delegate.getName();
} }
@Override @Override
@ -173,44 +161,42 @@ public class FilteredAggregatorFactory implements AggregatorFactory
return delegate.getRequiredColumns(); return delegate.getRequiredColumns();
} }
private static Pair<DimensionSelector, IntPredicate> makeFilterPredicate( @Override
final DimFilter dimFilter, public String toString()
final ColumnSelectorFactory metricFactory
)
{ {
final SelectorDimFilter selector; return "FilteredAggregatorFactory{" +
if (dimFilter instanceof NotDimFilter) { ", delegate=" + delegate +
// we only support NotDimFilter with Selector filter ", filter=" + filter +
selector = (SelectorDimFilter) ((NotDimFilter) dimFilter).getField(); '}';
} else if (dimFilter instanceof SelectorDimFilter) {
selector = (SelectorDimFilter) dimFilter;
} else {
throw new ISE("Unsupported DimFilter type [%d]", dimFilter.getClass());
}
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(selector.getDimension());
final int lookupId = dimSelector.lookupId(selector.getValue());
final IntPredicate predicate;
if (dimFilter instanceof NotDimFilter) {
predicate = new IntPredicate()
{
@Override
public boolean apply(int value)
{
return lookupId != value;
}
};
} else {
predicate = new IntPredicate()
{
@Override
public boolean apply(int value)
{
return lookupId == value;
}
};
}
return Pair.of(dimSelector, predicate);
} }
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
FilteredAggregatorFactory that = (FilteredAggregatorFactory) o;
if (delegate != null ? !delegate.equals(that.delegate) : that.delegate != null) {
return false;
}
if (filter != null ? !filter.equals(that.filter) : that.filter != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = delegate != null ? delegate.hashCode() : 0;
result = 31 * result + (filter != null ? filter.hashCode() : 0);
return result;
}
} }

View File

@ -19,6 +19,7 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import io.druid.query.filter.ValueMatcher;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts; import io.druid.segment.data.IndexedInts;
@ -26,14 +27,12 @@ import java.nio.ByteBuffer;
public class FilteredBufferAggregator implements BufferAggregator public class FilteredBufferAggregator implements BufferAggregator
{ {
private final DimensionSelector dimSelector; private final ValueMatcher matcher;
private final IntPredicate predicate;
private final BufferAggregator delegate; private final BufferAggregator delegate;
public FilteredBufferAggregator(DimensionSelector dimSelector, IntPredicate predicate, BufferAggregator delegate) public FilteredBufferAggregator(ValueMatcher matcher, BufferAggregator delegate)
{ {
this.dimSelector = dimSelector; this.matcher = matcher;
this.predicate = predicate;
this.delegate = delegate; this.delegate = delegate;
} }
@ -46,13 +45,8 @@ public class FilteredBufferAggregator implements BufferAggregator
@Override @Override
public void aggregate(ByteBuffer buf, int position) public void aggregate(ByteBuffer buf, int position)
{ {
final IndexedInts row = dimSelector.getRow(); if (matcher.matches()) {
final int size = row.size(); delegate.aggregate(buf, position);
for (int i = 0; i < size; ++i) {
if (predicate.apply(row.get(i))) {
delegate.aggregate(buf, position);
break;
}
} }
} }

View File

@ -20,6 +20,7 @@
package io.druid.query.filter; package io.druid.query.filter;
import com.metamx.collections.bitmap.ImmutableBitmap; import com.metamx.collections.bitmap.ImmutableBitmap;
import io.druid.segment.ColumnSelectorFactory;
/** /**
*/ */
@ -27,4 +28,5 @@ public interface Filter
{ {
public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector); public ImmutableBitmap getBitmapIndex(BitmapIndexSelector selector);
public ValueMatcher makeMatcher(ValueMatcherFactory factory); public ValueMatcher makeMatcher(ValueMatcherFactory factory);
public ValueMatcher makeMatcher(ColumnSelectorFactory columnSelectorFactory);
} }

View File

@ -25,6 +25,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import java.util.List; import java.util.List;
@ -68,9 +69,23 @@ public class AndFilter implements Filter
for (int i = 0; i < filters.size(); i++) { for (int i = 0; i < filters.size(); i++) {
matchers[i] = filters.get(i).makeMatcher(factory); matchers[i] = filters.get(i).makeMatcher(factory);
} }
return makeMatcher(matchers);
}
if (matchers.length == 1) { public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
return matchers[0]; {
final ValueMatcher[] matchers = new ValueMatcher[filters.size()];
for (int i = 0; i < filters.size(); i++) {
matchers[i] = filters.get(i).makeMatcher(factory);
}
return makeMatcher(matchers);
}
private ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers)
{
if (baseMatchers.length == 1) {
return baseMatchers[0];
} }
return new ValueMatcher() return new ValueMatcher()
@ -78,7 +93,7 @@ public class AndFilter implements Filter
@Override @Override
public boolean matches() public boolean matches()
{ {
for (ValueMatcher matcher : matchers) { for (ValueMatcher matcher : baseMatchers) {
if (!matcher.matches()) { if (!matcher.matches()) {
return false; return false;
} }
@ -87,5 +102,4 @@ public class AndFilter implements Filter
} }
}; };
} }
} }

View File

@ -27,6 +27,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -76,4 +77,10 @@ class DimensionPredicateFilter implements Filter
{ {
return factory.makeValueMatcher(dimension, predicate); return factory.makeValueMatcher(dimension, predicate);
} }
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
throw new UnsupportedOperationException();
}
} }

View File

@ -26,6 +26,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import java.util.List; import java.util.List;
@ -75,4 +76,11 @@ public class ExtractionFilter implements Filter
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
throw new UnsupportedOperationException();
}
} }

View File

@ -27,6 +27,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.data.Indexed; import io.druid.segment.data.Indexed;
import org.mozilla.javascript.Context; import org.mozilla.javascript.Context;
import org.mozilla.javascript.Function; import org.mozilla.javascript.Function;
@ -160,4 +161,11 @@ public class JavaScriptFilter implements Filter
return script.hashCode(); return script.hashCode();
} }
} }
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
throw new UnsupportedOperationException();
}
} }

View File

@ -24,6 +24,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
/** /**
*/ */
@ -61,4 +62,19 @@ public class NotFilter implements Filter
} }
}; };
} }
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
final ValueMatcher baseMatcher = baseFilter.makeMatcher(factory);
return new ValueMatcher()
{
@Override
public boolean matches()
{
return !baseMatcher.matches();
}
};
}
} }

View File

@ -25,6 +25,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import java.util.List; import java.util.List;
@ -68,9 +69,22 @@ public class OrFilter implements Filter
for (int i = 0; i < filters.size(); i++) { for (int i = 0; i < filters.size(); i++) {
matchers[i] = filters.get(i).makeMatcher(factory); matchers[i] = filters.get(i).makeMatcher(factory);
} }
return makeMatcher(matchers);
}
if (matchers.length == 1) { public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
return matchers[0]; {
final ValueMatcher[] matchers = new ValueMatcher[filters.size()];
for (int i = 0; i < filters.size(); i++) {
matchers[i] = filters.get(i).makeMatcher(factory);
}
return makeMatcher(matchers);
}
private ValueMatcher makeMatcher(final ValueMatcher[] baseMatchers){
if (baseMatchers.length == 1) {
return baseMatchers[0];
} }
return new ValueMatcher() return new ValueMatcher()
@ -78,7 +92,7 @@ public class OrFilter implements Filter
@Override @Override
public boolean matches() public boolean matches()
{ {
for (ValueMatcher matcher : matchers) { for (ValueMatcher matcher : baseMatchers) {
if (matcher.matches()) { if (matcher.matches()) {
return true; return true;
} }
@ -87,4 +101,5 @@ public class OrFilter implements Filter
} }
}; };
} }
} }

View File

@ -20,10 +20,14 @@
package io.druid.segment.filter; package io.druid.segment.filter;
import com.metamx.collections.bitmap.ImmutableBitmap; import com.metamx.collections.bitmap.ImmutableBitmap;
import io.druid.query.aggregation.Aggregators;
import io.druid.query.filter.BitmapIndexSelector; import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
/** /**
*/ */
@ -52,4 +56,34 @@ public class SelectorFilter implements Filter
{ {
return factory.makeValueMatcher(dimension, value); return factory.makeValueMatcher(dimension, value);
} }
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory columnSelectorFactory)
{
final DimensionSelector dimensionSelector = columnSelectorFactory.makeDimensionSelector(dimension);
// Missing columns are treated the same way as selector.getBitmapIndex, always returning false
if (dimensionSelector == null) {
return new BooleanValueMatcher(false);
} else {
final int valueId = dimensionSelector.lookupId(value);
return new ValueMatcher()
{
@Override
public boolean matches()
{
final IndexedInts row = dimensionSelector.getRow();
final int size = row.size();
for (int i = 0; i < size; ++i) {
if (row.get(i) == valueId) {
return true;
}
}
return false;
}
};
}
}
} }

View File

@ -24,6 +24,7 @@ import io.druid.query.filter.BitmapIndexSelector;
import io.druid.query.filter.Filter; import io.druid.query.filter.Filter;
import io.druid.query.filter.ValueMatcher; import io.druid.query.filter.ValueMatcher;
import io.druid.query.filter.ValueMatcherFactory; import io.druid.query.filter.ValueMatcherFactory;
import io.druid.segment.ColumnSelectorFactory;
/** /**
*/ */
@ -59,4 +60,11 @@ public class SpatialFilter implements Filter
bound bound
); );
} }
@Override
public ValueMatcher makeMatcher(ColumnSelectorFactory factory)
{
throw new UnsupportedOperationException();
}
} }

View File

@ -837,7 +837,11 @@ public class IncrementalIndex implements Iterable<Row>, Closeable
public int getId(String value) public int getId(String value)
{ {
return falseIds.get(value); if (value == null) {
value = "";
}
final Integer id = falseIds.get(value);
return id == null ? -1 : id;
} }
public String getValue(int id) public String getValue(int id)

View File

@ -19,7 +19,11 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import com.google.common.collect.Lists;
import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.NotDimFilter; import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.SelectorDimFilter; import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
@ -46,7 +50,6 @@ public class FilteredAggregatorTest
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values); final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory( FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
"test",
new DoubleSumAggregatorFactory("billy", "value"), new DoubleSumAggregatorFactory("billy", "value"),
new SelectorDimFilter("dim", "a") new SelectorDimFilter("dim", "a")
); );
@ -55,7 +58,7 @@ public class FilteredAggregatorTest
makeColumnSelector(selector) makeColumnSelector(selector)
); );
Assert.assertEquals("test", agg.getName()); Assert.assertEquals("billy", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue(); double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst; double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
@ -164,7 +167,6 @@ public class FilteredAggregatorTest
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values); final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory( FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
"test",
new DoubleSumAggregatorFactory("billy", "value"), new DoubleSumAggregatorFactory("billy", "value"),
new NotDimFilter(new SelectorDimFilter("dim", "b")) new NotDimFilter(new SelectorDimFilter("dim", "b"))
); );
@ -173,7 +175,52 @@ public class FilteredAggregatorTest
makeColumnSelector(selector) makeColumnSelector(selector)
); );
Assert.assertEquals("test", agg.getName()); Assert.assertEquals("billy", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
double expectedThird = expectedSecond;
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
}
@Test
public void testAggregateWithOrFilter()
{
final float[] values = {0.15f, 0.27f, 0.14f};
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
new DoubleSumAggregatorFactory("billy", "value"),
new OrDimFilter(Lists.<DimFilter>newArrayList(new SelectorDimFilter("dim", "a"), new SelectorDimFilter("dim", "b")))
);
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
makeColumnSelector(selector)
);
Assert.assertEquals("billy", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
double expectedThird = expectedSecond + new Float(values[2]).doubleValue();
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
}
@Test
public void testAggregateWithAndFilter()
{
final float[] values = {0.15f, 0.27f};
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
new DoubleSumAggregatorFactory("billy", "value"),
new AndDimFilter(Lists.<DimFilter>newArrayList(new NotDimFilter(new SelectorDimFilter("dim", "b")), new SelectorDimFilter("dim", "a"))));
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
makeColumnSelector(selector)
);
Assert.assertEquals("billy", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue(); double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst; double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;

View File

@ -32,12 +32,15 @@ import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper; import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result; import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.MaxAggregatorFactory; import io.druid.query.aggregation.MaxAggregatorFactory;
import io.druid.query.aggregation.MinAggregatorFactory; import io.druid.query.aggregation.MinAggregatorFactory;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.AndDimFilter; import io.druid.query.filter.AndDimFilter;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.RegexDimFilter; import io.druid.query.filter.RegexDimFilter;
import io.druid.query.spec.MultipleIntervalSegmentSpec; import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.segment.TestHelper; import io.druid.segment.TestHelper;
@ -53,8 +56,8 @@ import org.junit.runners.Parameterized;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Map;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
*/ */
@ -1658,4 +1661,250 @@ public class TimeseriesQueryRunnerTest
); );
TestHelper.assertExpectedResults(expectedResults, actualResults); TestHelper.assertExpectedResults(expectedResults, actualResults);
} }
@Test
public void testTimeSeriesWithFilteredAgg()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new FilteredAggregatorFactory(
new CountAggregatorFactory("filteredAgg"),
Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.marketDimension)
.value("spot")
.build()
)
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"filteredAgg", 18L,
"addRowsIndexConstant", 12486.361190795898d,
"index", 12459.361190795898d,
"uniques", 9.019833517963864d,
"rows", 26L
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test
public void testTimeSeriesWithFilteredAggDimensionNotPresentNotNullValue()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new FilteredAggregatorFactory(
new CountAggregatorFactory("filteredAgg"),
Druids.newSelectorDimFilterBuilder()
.dimension("abraKaDabra")
.value("Lol")
.build()
)
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"filteredAgg", 0L,
"addRowsIndexConstant", 12486.361190795898d,
"index", 12459.361190795898d,
"uniques", 9.019833517963864d,
"rows", 26L
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test
public void testTimeSeriesWithFilteredAggDimensionNotPresentNullValue()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new FilteredAggregatorFactory(
new CountAggregatorFactory("filteredAgg"),
Druids.newSelectorDimFilterBuilder()
.dimension("abraKaDabra")
.value(null)
.build()
)
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"filteredAgg", 0L,
"addRowsIndexConstant", 12486.361190795898d,
"index", 12459.361190795898d,
"uniques", 9.019833517963864d,
"rows", 26L
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test
public void testTimeSeriesWithFilteredAggValueNotPresent()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new FilteredAggregatorFactory(
new CountAggregatorFactory("filteredAgg"),
new NotDimFilter(
Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.marketDimension)
.value("LolLol")
.build()
)
)
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"filteredAgg", 26L,
"addRowsIndexConstant", 12486.361190795898d,
"index", 12459.361190795898d,
"uniques", 9.019833517963864d,
"rows", 26L
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
@Test
public void testTimeSeriesWithFilteredAggNullValue()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.firstToThird)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonAggregators,
Lists.newArrayList(
new FilteredAggregatorFactory(
new CountAggregatorFactory("filteredAgg"),
new NotDimFilter(
Druids.newSelectorDimFilterBuilder()
.dimension(QueryRunnerTestHelper.marketDimension)
.value(null)
.build()
)
)
)
)
)
)
.postAggregators(Arrays.<PostAggregator>asList(QueryRunnerTestHelper.addRowsIndexConstant))
.build();
Iterable<Result<TimeseriesResultValue>> actualResults = Sequences.toList(
runner.run(query, CONTEXT),
Lists.<Result<TimeseriesResultValue>>newArrayList()
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2011-04-01"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>of(
"filteredAgg", 26L,
"addRowsIndexConstant", 12486.361190795898d,
"index", 12459.361190795898d,
"uniques", 9.019833517963864d,
"rows", 26L
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, actualResults);
}
} }