mirror of https://github.com/apache/druid.git
Merge pull request #803 from metamx/filtered-aggregator
Native filtered aggregator
This commit is contained in:
commit
69ca6cd924
|
@ -160,3 +160,28 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to
|
|||
```json
|
||||
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
|
||||
```
|
||||
|
||||
## Miscellaneous Aggregations
|
||||
|
||||
### Filtered Aggregator
|
||||
|
||||
A filtered aggregator wraps any given aggregator, but only aggregates the values for which the given dimension filter matches.
|
||||
|
||||
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
|
||||
|
||||
*Limitations:* The filtered aggregator currently only supports selector filters, i.e. matching a dimension against a single value.
|
||||
|
||||
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
|
||||
|
||||
```json
|
||||
{
|
||||
"type" : "filtered",
|
||||
"name" : "aggMatching",
|
||||
"filter" : {
|
||||
"type" : "selector",
|
||||
"dimension" : <dimension>,
|
||||
"value" : <dimension value>
|
||||
}
|
||||
"aggregator" : <aggregation>
|
||||
}
|
||||
```
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.hash.Hashing;
|
|||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.FilteredAggregatorFactory;
|
||||
import io.druid.query.aggregation.HistogramAggregatorFactory;
|
||||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
|
@ -68,7 +69,8 @@ public class AggregatorsModule extends SimpleModule
|
|||
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class)
|
||||
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class),
|
||||
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class)
|
||||
})
|
||||
public static interface AggregatorFactoryMixin
|
||||
{
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
public class FilteredAggregator implements Aggregator
|
||||
{
|
||||
private final String name;
|
||||
private final DimensionSelector dimSelector;
|
||||
private final Aggregator delegate;
|
||||
private final IntPredicate predicate;
|
||||
|
||||
public FilteredAggregator(String name, DimensionSelector dimSelector, IntPredicate predicate, Aggregator delegate)
|
||||
{
|
||||
this.name = name;
|
||||
this.dimSelector = dimSelector;
|
||||
this.delegate = delegate;
|
||||
this.predicate = predicate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
final IndexedInts row = dimSelector.getRow();
|
||||
final int size = row.size();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
if (predicate.apply(row.get(i))) {
|
||||
delegate.aggregate();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
delegate.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return delegate.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat()
|
||||
{
|
||||
return delegate.getFloat();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
delegate.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,180 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.druid.query.filter.DimFilter;
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
public class FilteredAggregatorFactory implements AggregatorFactory
|
||||
{
|
||||
private static final byte CACHE_TYPE_ID = 0x9;
|
||||
|
||||
private final String name;
|
||||
private final AggregatorFactory delegate;
|
||||
private final SelectorDimFilter filter;
|
||||
|
||||
public FilteredAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("aggregator") AggregatorFactory delegate,
|
||||
@JsonProperty("filter") DimFilter filter
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(delegate);
|
||||
Preconditions.checkNotNull(filter);
|
||||
Preconditions.checkArgument(filter instanceof SelectorDimFilter, "FilteredAggregator currently only supports filters of type selector");
|
||||
|
||||
this.name = name;
|
||||
this.delegate = delegate;
|
||||
this.filter = (SelectorDimFilter)filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
final Aggregator aggregator = delegate.factorize(metricFactory);
|
||||
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(filter.getDimension());
|
||||
final int lookupId = dimSelector.lookupId(filter.getValue());
|
||||
final IntPredicate predicate = new IntPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(int value)
|
||||
{
|
||||
return lookupId == value;
|
||||
}
|
||||
};
|
||||
return new FilteredAggregator(name, dimSelector, predicate, aggregator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
|
||||
{
|
||||
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
|
||||
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(filter.getDimension());
|
||||
final int lookupId = dimSelector.lookupId(filter.getValue());
|
||||
final IntPredicate predicate = new IntPredicate()
|
||||
{
|
||||
@Override
|
||||
public boolean apply(int value)
|
||||
{
|
||||
return lookupId == value;
|
||||
}
|
||||
};
|
||||
return new FilteredBufferAggregator(dimSelector, predicate, aggregator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
return delegate.getComparator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs)
|
||||
{
|
||||
return delegate.combine(lhs, rhs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return delegate.getCombiningFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
return delegate.deserialize(object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return delegate.finalizeComputation(object);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields()
|
||||
{
|
||||
return delegate.requiredFields();
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] filterCacheKey = filter.getCacheKey();
|
||||
byte[] aggregatorCacheKey = delegate.getCacheKey();
|
||||
return ByteBuffer.allocate(1 + filterCacheKey.length + aggregatorCacheKey.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(filterCacheKey)
|
||||
.put(aggregatorCacheKey)
|
||||
.array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return delegate.getTypeName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize()
|
||||
{
|
||||
return delegate.getMaxIntermediateSize();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return delegate.getAggregatorStartValue();
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public AggregatorFactory getAggregator()
|
||||
{
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public DimFilter getFilter()
|
||||
{
|
||||
return filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return delegate.getRequiredColumns();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Iterables;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class FilteredBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final DimensionSelector dimSelector;
|
||||
private final IntPredicate predicate;
|
||||
private final BufferAggregator delegate;
|
||||
|
||||
public FilteredBufferAggregator(DimensionSelector dimSelector, IntPredicate predicate, BufferAggregator delegate)
|
||||
{
|
||||
this.dimSelector = dimSelector;
|
||||
this.predicate = predicate;
|
||||
this.delegate = delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
delegate.init(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
final IndexedInts row = dimSelector.getRow();
|
||||
final int size = row.size();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
if (predicate.apply(row.get(i))) {
|
||||
delegate.aggregate(buf, position);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return delegate.get(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
return delegate.getFloat(buf, position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
delegate.close();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
/**
|
||||
* can be replaced with http://docs.oracle.com/javase/8/docs/api/java/util/function/IntPredicate.html
|
||||
* when druid moves to java 8.
|
||||
*/
|
||||
public interface IntPredicate
|
||||
{
|
||||
boolean apply(int value);
|
||||
}
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2014 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import io.druid.query.filter.SelectorDimFilter;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
import io.druid.segment.FloatColumnSelector;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import io.druid.segment.TimestampColumnSelector;
|
||||
import io.druid.segment.data.ArrayBasedIndexedInts;
|
||||
import io.druid.segment.data.IndexedInts;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class FilteredAggregatorTest
|
||||
{
|
||||
private void aggregate(TestFloatColumnSelector selector, FilteredAggregator agg)
|
||||
{
|
||||
agg.aggregate();
|
||||
selector.increment();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAggregate()
|
||||
{
|
||||
final float[] values = {0.15f, 0.27f};
|
||||
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
|
||||
|
||||
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
|
||||
"test",
|
||||
new DoubleSumAggregatorFactory("billy", "value"),
|
||||
new SelectorDimFilter("dim", "a")
|
||||
);
|
||||
|
||||
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
|
||||
new ColumnSelectorFactory()
|
||||
{
|
||||
@Override
|
||||
public TimestampColumnSelector makeTimestampColumnSelector()
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DimensionSelector makeDimensionSelector(String dimensionName)
|
||||
{
|
||||
if (dimensionName.equals("dim")) {
|
||||
return new DimensionSelector()
|
||||
{
|
||||
@Override
|
||||
public IndexedInts getRow()
|
||||
{
|
||||
if (selector.getIndex() % 3 == 2) {
|
||||
return new ArrayBasedIndexedInts(new int[]{1});
|
||||
} else {
|
||||
return new ArrayBasedIndexedInts(new int[]{0});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getValueCardinality()
|
||||
{
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String lookupName(int id)
|
||||
{
|
||||
switch (id) {
|
||||
case 0:
|
||||
return "a";
|
||||
case 1:
|
||||
return "b";
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int lookupId(String name)
|
||||
{
|
||||
switch (name) {
|
||||
case "a":
|
||||
return 0;
|
||||
case "b":
|
||||
return 1;
|
||||
default:
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
}
|
||||
};
|
||||
} else {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FloatColumnSelector makeFloatColumnSelector(String columnName)
|
||||
{
|
||||
if (columnName.equals("value")) {
|
||||
return selector;
|
||||
} else {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
|
||||
{
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Assert.assertEquals("test", agg.getName());
|
||||
|
||||
double expectedFirst = new Float(values[0]).doubleValue();
|
||||
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
|
||||
double expectedThird = expectedSecond;
|
||||
|
||||
Assert.assertEquals(0.0d, agg.get());
|
||||
Assert.assertEquals(0.0d, agg.get());
|
||||
Assert.assertEquals(0.0d, agg.get());
|
||||
aggregate(selector, agg);
|
||||
Assert.assertEquals(expectedFirst, agg.get());
|
||||
Assert.assertEquals(expectedFirst, agg.get());
|
||||
Assert.assertEquals(expectedFirst, agg.get());
|
||||
aggregate(selector, agg);
|
||||
Assert.assertEquals(expectedSecond, agg.get());
|
||||
Assert.assertEquals(expectedSecond, agg.get());
|
||||
Assert.assertEquals(expectedSecond, agg.get());
|
||||
aggregate(selector, agg);
|
||||
Assert.assertEquals(expectedThird, agg.get());
|
||||
Assert.assertEquals(expectedThird, agg.get());
|
||||
Assert.assertEquals(expectedThird, agg.get());
|
||||
}
|
||||
}
|
|
@ -44,4 +44,9 @@ public class TestFloatColumnSelector implements FloatColumnSelector
|
|||
{
|
||||
++index;
|
||||
}
|
||||
|
||||
public int getIndex()
|
||||
{
|
||||
return index;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue