mirror of https://github.com/apache/druid.git
Merge pull request #804 from metamx/notdimfilter-support
Add support for NotDimFilter with FilteredAggregator
This commit is contained in:
commit
a0baf7e8f4
|
@ -169,7 +169,7 @@ A filtered aggregator wraps any given aggregator, but only aggregates the values
|
||||||
|
|
||||||
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
|
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
|
||||||
|
|
||||||
*Limitations:* The filtered aggregator currently only supports selector filters, i.e. matching a dimension against a single value.
|
*Limitations:* The filtered aggregator currently only supports selector and not filter with a single selector, i.e. matching a dimension against a single value.
|
||||||
|
|
||||||
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
|
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,10 @@ package io.druid.query.aggregation;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.metamx.common.ISE;
|
||||||
|
import com.metamx.common.Pair;
|
||||||
import io.druid.query.filter.DimFilter;
|
import io.druid.query.filter.DimFilter;
|
||||||
|
import io.druid.query.filter.NotDimFilter;
|
||||||
import io.druid.query.filter.SelectorDimFilter;
|
import io.druid.query.filter.SelectorDimFilter;
|
||||||
import io.druid.segment.ColumnSelectorFactory;
|
import io.druid.segment.ColumnSelectorFactory;
|
||||||
import io.druid.segment.DimensionSelector;
|
import io.druid.segment.DimensionSelector;
|
||||||
|
@ -36,7 +39,7 @@ public class FilteredAggregatorFactory implements AggregatorFactory
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
private final AggregatorFactory delegate;
|
private final AggregatorFactory delegate;
|
||||||
private final SelectorDimFilter filter;
|
private final DimFilter filter;
|
||||||
|
|
||||||
public FilteredAggregatorFactory(
|
public FilteredAggregatorFactory(
|
||||||
@JsonProperty("name") String name,
|
@JsonProperty("name") String name,
|
||||||
|
@ -46,45 +49,37 @@ public class FilteredAggregatorFactory implements AggregatorFactory
|
||||||
{
|
{
|
||||||
Preconditions.checkNotNull(delegate);
|
Preconditions.checkNotNull(delegate);
|
||||||
Preconditions.checkNotNull(filter);
|
Preconditions.checkNotNull(filter);
|
||||||
Preconditions.checkArgument(filter instanceof SelectorDimFilter, "FilteredAggregator currently only supports filters of type selector");
|
Preconditions.checkArgument(
|
||||||
|
filter instanceof SelectorDimFilter ||
|
||||||
|
(filter instanceof NotDimFilter && ((NotDimFilter) filter).getField() instanceof SelectorDimFilter),
|
||||||
|
"FilteredAggregator currently only supports filters of type 'selector' and their negation"
|
||||||
|
);
|
||||||
|
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
this.filter = (SelectorDimFilter)filter;
|
this.filter = filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||||
{
|
{
|
||||||
final Aggregator aggregator = delegate.factorize(metricFactory);
|
final Aggregator aggregator = delegate.factorize(metricFactory);
|
||||||
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(filter.getDimension());
|
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
|
||||||
final int lookupId = dimSelector.lookupId(filter.getValue());
|
filter,
|
||||||
final IntPredicate predicate = new IntPredicate()
|
metricFactory
|
||||||
{
|
);
|
||||||
@Override
|
return new FilteredAggregator(name, selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
|
||||||
public boolean apply(int value)
|
|
||||||
{
|
|
||||||
return lookupId == value;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return new FilteredAggregator(name, dimSelector, predicate, aggregator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||||
{
|
{
|
||||||
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
|
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
|
||||||
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(filter.getDimension());
|
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
|
||||||
final int lookupId = dimSelector.lookupId(filter.getValue());
|
filter,
|
||||||
final IntPredicate predicate = new IntPredicate()
|
metricFactory
|
||||||
{
|
);
|
||||||
@Override
|
return new FilteredBufferAggregator(selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
|
||||||
public boolean apply(int value)
|
|
||||||
{
|
|
||||||
return lookupId == value;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return new FilteredBufferAggregator(dimSelector, predicate, aggregator);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -177,4 +172,45 @@ public class FilteredAggregatorFactory implements AggregatorFactory
|
||||||
{
|
{
|
||||||
return delegate.getRequiredColumns();
|
return delegate.getRequiredColumns();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Pair<DimensionSelector, IntPredicate> makeFilterPredicate(
|
||||||
|
final DimFilter dimFilter,
|
||||||
|
final ColumnSelectorFactory metricFactory
|
||||||
|
)
|
||||||
|
{
|
||||||
|
final SelectorDimFilter selector;
|
||||||
|
if (dimFilter instanceof NotDimFilter) {
|
||||||
|
// we only support NotDimFilter with Selector filter
|
||||||
|
selector = (SelectorDimFilter) ((NotDimFilter) dimFilter).getField();
|
||||||
|
} else if (dimFilter instanceof SelectorDimFilter) {
|
||||||
|
selector = (SelectorDimFilter) dimFilter;
|
||||||
|
} else {
|
||||||
|
throw new ISE("Unsupported DimFilter type [%d]", dimFilter.getClass());
|
||||||
|
}
|
||||||
|
|
||||||
|
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(selector.getDimension());
|
||||||
|
final int lookupId = dimSelector.lookupId(selector.getValue());
|
||||||
|
final IntPredicate predicate;
|
||||||
|
if (dimFilter instanceof NotDimFilter) {
|
||||||
|
predicate = new IntPredicate()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean apply(int value)
|
||||||
|
{
|
||||||
|
return lookupId != value;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
predicate = new IntPredicate()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean apply(int value)
|
||||||
|
{
|
||||||
|
return lookupId == value;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return Pair.of(dimSelector, predicate);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.query.aggregation;
|
package io.druid.query.aggregation;
|
||||||
|
|
||||||
|
import io.druid.query.filter.NotDimFilter;
|
||||||
import io.druid.query.filter.SelectorDimFilter;
|
import io.druid.query.filter.SelectorDimFilter;
|
||||||
import io.druid.segment.ColumnSelectorFactory;
|
import io.druid.segment.ColumnSelectorFactory;
|
||||||
import io.druid.segment.DimensionSelector;
|
import io.druid.segment.DimensionSelector;
|
||||||
|
@ -51,7 +52,21 @@ public class FilteredAggregatorTest
|
||||||
);
|
);
|
||||||
|
|
||||||
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
|
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
|
||||||
new ColumnSelectorFactory()
|
makeColumnSelector(selector)
|
||||||
|
);
|
||||||
|
|
||||||
|
Assert.assertEquals("test", agg.getName());
|
||||||
|
|
||||||
|
double expectedFirst = new Float(values[0]).doubleValue();
|
||||||
|
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||||
|
double expectedThird = expectedSecond;
|
||||||
|
|
||||||
|
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ColumnSelectorFactory makeColumnSelector(final TestFloatColumnSelector selector){
|
||||||
|
|
||||||
|
return new ColumnSelectorFactory()
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||||
|
@ -127,7 +142,35 @@ public class FilteredAggregatorTest
|
||||||
{
|
{
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
}
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertValues(FilteredAggregator agg,TestFloatColumnSelector selector, double... expectedVals){
|
||||||
|
Assert.assertEquals(0.0d, agg.get());
|
||||||
|
Assert.assertEquals(0.0d, agg.get());
|
||||||
|
Assert.assertEquals(0.0d, agg.get());
|
||||||
|
for(double expectedVal : expectedVals){
|
||||||
|
aggregate(selector, agg);
|
||||||
|
Assert.assertEquals(expectedVal, agg.get());
|
||||||
|
Assert.assertEquals(expectedVal, agg.get());
|
||||||
|
Assert.assertEquals(expectedVal, agg.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAggregateWithNotFilter()
|
||||||
|
{
|
||||||
|
final float[] values = {0.15f, 0.27f};
|
||||||
|
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
|
||||||
|
|
||||||
|
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
|
||||||
|
"test",
|
||||||
|
new DoubleSumAggregatorFactory("billy", "value"),
|
||||||
|
new NotDimFilter(new SelectorDimFilter("dim", "b"))
|
||||||
|
);
|
||||||
|
|
||||||
|
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
|
||||||
|
makeColumnSelector(selector)
|
||||||
);
|
);
|
||||||
|
|
||||||
Assert.assertEquals("test", agg.getName());
|
Assert.assertEquals("test", agg.getName());
|
||||||
|
@ -135,21 +178,7 @@ public class FilteredAggregatorTest
|
||||||
double expectedFirst = new Float(values[0]).doubleValue();
|
double expectedFirst = new Float(values[0]).doubleValue();
|
||||||
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||||
double expectedThird = expectedSecond;
|
double expectedThird = expectedSecond;
|
||||||
|
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
|
||||||
|
}
|
||||||
|
|
||||||
Assert.assertEquals(0.0d, agg.get());
|
|
||||||
Assert.assertEquals(0.0d, agg.get());
|
|
||||||
Assert.assertEquals(0.0d, agg.get());
|
|
||||||
aggregate(selector, agg);
|
|
||||||
Assert.assertEquals(expectedFirst, agg.get());
|
|
||||||
Assert.assertEquals(expectedFirst, agg.get());
|
|
||||||
Assert.assertEquals(expectedFirst, agg.get());
|
|
||||||
aggregate(selector, agg);
|
|
||||||
Assert.assertEquals(expectedSecond, agg.get());
|
|
||||||
Assert.assertEquals(expectedSecond, agg.get());
|
|
||||||
Assert.assertEquals(expectedSecond, agg.get());
|
|
||||||
aggregate(selector, agg);
|
|
||||||
Assert.assertEquals(expectedThird, agg.get());
|
|
||||||
Assert.assertEquals(expectedThird, agg.get());
|
|
||||||
Assert.assertEquals(expectedThird, agg.get());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue