add support for not filter and add test

This commit is contained in:
nishantmonu51 2014-10-22 10:25:24 +05:30
parent 69ca6cd924
commit 6cb6ec39cf
3 changed files with 181 additions and 118 deletions

View File

@ -169,7 +169,7 @@ A filtered aggregator wraps any given aggregator, but only aggregates the values
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations. This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
*Limitations:* The filtered aggregator currently only supports selector filters, i.e. matching a dimension against a single value. *Limitations:* The filtered aggregator currently only supports selector and not filters, i.e. matching a dimension against a single value.
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data. *Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.

View File

@ -21,7 +21,9 @@ package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.common.Pair;
import io.druid.query.filter.DimFilter; import io.druid.query.filter.DimFilter;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.SelectorDimFilter; import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
@ -36,7 +38,7 @@ public class FilteredAggregatorFactory implements AggregatorFactory
private final String name; private final String name;
private final AggregatorFactory delegate; private final AggregatorFactory delegate;
private final SelectorDimFilter filter; private final DimFilter filter;
public FilteredAggregatorFactory( public FilteredAggregatorFactory(
@JsonProperty("name") String name, @JsonProperty("name") String name,
@ -46,45 +48,36 @@ public class FilteredAggregatorFactory implements AggregatorFactory
{ {
Preconditions.checkNotNull(delegate); Preconditions.checkNotNull(delegate);
Preconditions.checkNotNull(filter); Preconditions.checkNotNull(filter);
Preconditions.checkArgument(filter instanceof SelectorDimFilter, "FilteredAggregator currently only supports filters of type selector"); Preconditions.checkArgument(
filter instanceof SelectorDimFilter || filter instanceof NotDimFilter,
"FilteredAggregator currently only supports filters of type selector and not"
);
this.name = name; this.name = name;
this.delegate = delegate; this.delegate = delegate;
this.filter = (SelectorDimFilter)filter; this.filter = filter;
} }
@Override @Override
public Aggregator factorize(ColumnSelectorFactory metricFactory) public Aggregator factorize(ColumnSelectorFactory metricFactory)
{ {
final Aggregator aggregator = delegate.factorize(metricFactory); final Aggregator aggregator = delegate.factorize(metricFactory);
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(filter.getDimension()); final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
final int lookupId = dimSelector.lookupId(filter.getValue()); filter,
final IntPredicate predicate = new IntPredicate() metricFactory
{ );
@Override return new FilteredAggregator(name, selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
public boolean apply(int value)
{
return lookupId == value;
}
};
return new FilteredAggregator(name, dimSelector, predicate, aggregator);
} }
@Override @Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{ {
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory); final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(filter.getDimension()); final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
final int lookupId = dimSelector.lookupId(filter.getValue()); filter,
final IntPredicate predicate = new IntPredicate() metricFactory
{ );
@Override return new FilteredBufferAggregator(selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
public boolean apply(int value)
{
return lookupId == value;
}
};
return new FilteredBufferAggregator(dimSelector, predicate, aggregator);
} }
@Override @Override
@ -177,4 +170,45 @@ public class FilteredAggregatorFactory implements AggregatorFactory
{ {
return delegate.getRequiredColumns(); return delegate.getRequiredColumns();
} }
private static Pair<DimensionSelector, IntPredicate> makeFilterPredicate(
DimFilter dimFilter,
ColumnSelectorFactory metricFactory
)
{
if (dimFilter instanceof SelectorDimFilter) {
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(((SelectorDimFilter) dimFilter).getDimension());
final int lookupId = dimSelector.lookupId(((SelectorDimFilter) dimFilter).getValue());
return Pair.<DimensionSelector, IntPredicate>of(
dimSelector, new IntPredicate()
{
@Override
public boolean apply(int value)
{
return lookupId == value;
}
}
);
} else if (dimFilter instanceof NotDimFilter) {
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
((NotDimFilter) dimFilter).getField(),
metricFactory
);
return Pair.<DimensionSelector, IntPredicate>of(
selectorPredicatePair.lhs, new IntPredicate()
{
@Override
public boolean apply(int value)
{
return !selectorPredicatePair.rhs.apply(value);
}
}
);
} else {
throw new UnsupportedOperationException(
"FilteredAggregator does not support DimFilter of type "
+ dimFilter.getClass()
);
}
}
} }

View File

@ -19,6 +19,7 @@
package io.druid.query.aggregation; package io.druid.query.aggregation;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.SelectorDimFilter; import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory; import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelector;
@ -51,83 +52,7 @@ public class FilteredAggregatorTest
); );
FilteredAggregator agg = (FilteredAggregator) factory.factorize( FilteredAggregator agg = (FilteredAggregator) factory.factorize(
new ColumnSelectorFactory() makeColumnSelector(selector)
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
throw new UnsupportedOperationException();
}
@Override
public DimensionSelector makeDimensionSelector(String dimensionName)
{
if (dimensionName.equals("dim")) {
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
if (selector.getIndex() % 3 == 2) {
return new ArrayBasedIndexedInts(new int[]{1});
} else {
return new ArrayBasedIndexedInts(new int[]{0});
}
}
@Override
public int getValueCardinality()
{
return 2;
}
@Override
public String lookupName(int id)
{
switch (id) {
case 0:
return "a";
case 1:
return "b";
default:
throw new IllegalArgumentException();
}
}
@Override
public int lookupId(String name)
{
switch (name) {
case "a":
return 0;
case "b":
return 1;
default:
throw new IllegalArgumentException();
}
}
};
} else {
throw new UnsupportedOperationException();
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (columnName.equals("value")) {
return selector;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
throw new UnsupportedOperationException();
}
}
); );
Assert.assertEquals("test", agg.getName()); Assert.assertEquals("test", agg.getName());
@ -136,20 +61,124 @@ public class FilteredAggregatorTest
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst; double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
double expectedThird = expectedSecond; double expectedThird = expectedSecond;
Assert.assertEquals(0.0d, agg.get()); assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
Assert.assertEquals(0.0d, agg.get());
Assert.assertEquals(0.0d, agg.get());
aggregate(selector, agg);
Assert.assertEquals(expectedFirst, agg.get());
Assert.assertEquals(expectedFirst, agg.get());
Assert.assertEquals(expectedFirst, agg.get());
aggregate(selector, agg);
Assert.assertEquals(expectedSecond, agg.get());
Assert.assertEquals(expectedSecond, agg.get());
Assert.assertEquals(expectedSecond, agg.get());
aggregate(selector, agg);
Assert.assertEquals(expectedThird, agg.get());
Assert.assertEquals(expectedThird, agg.get());
Assert.assertEquals(expectedThird, agg.get());
} }
private ColumnSelectorFactory makeColumnSelector(final TestFloatColumnSelector selector){
return new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
throw new UnsupportedOperationException();
}
@Override
public DimensionSelector makeDimensionSelector(String dimensionName)
{
if (dimensionName.equals("dim")) {
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
if (selector.getIndex() % 3 == 2) {
return new ArrayBasedIndexedInts(new int[]{1});
} else {
return new ArrayBasedIndexedInts(new int[]{0});
}
}
@Override
public int getValueCardinality()
{
return 2;
}
@Override
public String lookupName(int id)
{
switch (id) {
case 0:
return "a";
case 1:
return "b";
default:
throw new IllegalArgumentException();
}
}
@Override
public int lookupId(String name)
{
switch (name) {
case "a":
return 0;
case "b":
return 1;
default:
throw new IllegalArgumentException();
}
}
};
} else {
throw new UnsupportedOperationException();
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (columnName.equals("value")) {
return selector;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
throw new UnsupportedOperationException();
}
};
}
private void assertValues(FilteredAggregator agg,TestFloatColumnSelector selector, double... expectedVals){
Assert.assertEquals(0.0d, agg.get());
Assert.assertEquals(0.0d, agg.get());
Assert.assertEquals(0.0d, agg.get());
for(double expectedVal : expectedVals){
aggregate(selector, agg);
Assert.assertEquals(expectedVal, agg.get());
Assert.assertEquals(expectedVal, agg.get());
Assert.assertEquals(expectedVal, agg.get());
}
}
@Test
public void testAggregateWithNotFilter()
{
final float[] values = {0.15f, 0.27f};
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
"test",
new DoubleSumAggregatorFactory("billy", "value"),
new NotDimFilter(new SelectorDimFilter("dim", "b"))
);
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
makeColumnSelector(selector)
);
Assert.assertEquals("test", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
double expectedThird = expectedSecond;
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
}
} }