Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Glenn Nethercutt 2014-10-29 07:41:54 -04:00
commit 12e8feba8d
37 changed files with 679 additions and 52 deletions

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -160,3 +160,28 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to
```json
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
```
## Miscellaneous Aggregations
### Filtered Aggregator
A filtered aggregator wraps any given aggregator, but only aggregates the values for which the given dimension filter matches.
This makes it possible to compute the results of a filtered and an unfiltered aggregation simultaneously, without having to issue multiple queries, and use both results as part of post-aggregations.
*Limitations:* The filtered aggregator currently only supports selector and not filter with a single selector, i.e. matching a dimension against a single value.
*Note:* If only the filtered results are required, consider putting the filter on the query itself, which will be much faster since it does not require scanning all the data.
```json
{
"type" : "filtered",
"name" : "aggMatching",
"filter" : {
"type" : "selector",
"dimension" : <dimension>,
"value" : <dimension value>
}
"aggregator" : <aggregation>
}
```

View File

@ -19,13 +19,13 @@ Clone Druid and build it:
git clone https://github.com/metamx/druid.git druid
cd druid
git fetch --tags
git checkout druid-0.6.159
git checkout druid-0.6.160
./build.sh
```
### Downloading the DSK (Druid Standalone Kit)
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz) a stand-alone tarball and run it:
[Download](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz) a stand-alone tarball and run it:
``` bash
tar -xzf druid-services-0.X.X-bin.tar.gz

View File

@ -8,9 +8,9 @@ The previous examples are for Kafka 7. To support Kafka 8, a couple changes need
- Update realtime node's configs for Kafka 8 extensions
- e.g.
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.159",...]`
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-seven:0.6.160",...]`
- becomes
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.159",...]`
- `druid.extensions.coordinates=[...,"io.druid.extensions:druid-kafka-eight:0.6.160",...]`
- Update realtime task config for changed keys
- `firehose.type`, `plumber.rejectionPolicyFactory`, and all of `firehose.consumerProps` changes.

View File

@ -57,7 +57,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/overlord
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -139,7 +139,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/middlemanager
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod
@ -286,7 +286,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/historical
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -27,7 +27,7 @@ druid.host=localhost
druid.service=realtime
druid.port=8083
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.zk.service.host=localhost
@ -76,7 +76,7 @@ druid.host=#{IP_ADDR}:8080
druid.port=8080
druid.service=druid/prod/realtime
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -51,7 +51,7 @@ druid.service=druid/prod/router
druid.extensions.remoteRepositories=[]
druid.extensions.localRepository=lib
druid.extensions.coordinates=["io.druid.extensions:druid-histogram:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-histogram:0.6.160"]
druid.zk.service.host=#{ZK_IPs}
druid.zk.paths.base=/druid/prod

View File

@ -28,7 +28,7 @@ Configuration:
-Ddruid.zk.service.host=localhost
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
-Ddruid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
-Ddruid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
-Ddruid.db.connector.user=druid

View File

@ -49,7 +49,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
### Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz). Download this file to a directory of your choosing.
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz). Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -60,7 +60,7 @@ tar -zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.159
cd druid-services-0.6.160
```
You should see a bunch of files:

View File

@ -91,7 +91,7 @@ druid.service=overlord
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-kafka-seven:0.6.160"]
druid.db.connector.connectURI=jdbc:mysql://localhost:3306/druid
druid.db.connector.user=druid

View File

@ -13,7 +13,7 @@ In this tutorial, we will set up other types of Druid nodes and external depende
If you followed the first tutorial, you should already have Druid downloaded. If not, let's go back and do that first.
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz)
You can download the latest version of druid [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz)
and untar the contents within by issuing:
@ -149,7 +149,7 @@ druid.port=8081
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-s3-extensions:0.6.160"]
# Dummy read only AWS account (used to download example data)
druid.s3.secretKey=QyyfVZ7llSiRg6Qcrql1eEUG7buFpAK6T6engr1b
@ -240,7 +240,7 @@ druid.port=8083
druid.zk.service.host=localhost
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.159","io.druid.extensions:druid-kafka-seven:0.6.159"]
druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.6.160","io.druid.extensions:druid-kafka-seven:0.6.160"]
# Change this config to db to hand off to the rest of the Druid cluster
druid.publish.type=noop

View File

@ -37,7 +37,7 @@ There are two ways to setup Druid: download a tarball, or [Build From Source](Bu
h3. Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz)
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz)
Download this file to a directory of your choosing.
You can extract the awesomeness within by issuing:
@ -48,7 +48,7 @@ tar zxvf druid-services-*-bin.tar.gz
Not too lost so far right? That's great! If you cd into the directory:
```
cd druid-services-0.6.159
cd druid-services-0.6.160
```
You should see a bunch of files:

View File

@ -9,7 +9,7 @@ There are two ways to setup Druid: download a tarball, or build it from source.
# Download a Tarball
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.159-bin.tar.gz).
We've built a tarball that contains everything you'll need. You'll find it [here](http://static.druid.io/artifacts/releases/druid-services-0.6.160-bin.tar.gz).
Download this bad boy to a directory of your choosing.
You can extract the awesomeness within by issuing:

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -289,4 +289,4 @@ public class HadoopIngestionSpec extends IngestionSpec<HadoopIOConfig, HadoopTun
null
);
}
}
}

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -23,7 +23,7 @@
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<packaging>pom</packaging>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
<name>druid</name>
<description>druid</description>
<scm>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -26,6 +26,7 @@ import com.google.common.hash.Hashing;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.aggregation.HistogramAggregatorFactory;
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -68,7 +69,8 @@ public class AggregatorsModule extends SimpleModule
@JsonSubTypes.Type(name = "javascript", value = JavaScriptAggregatorFactory.class),
@JsonSubTypes.Type(name = "histogram", value = HistogramAggregatorFactory.class),
@JsonSubTypes.Type(name = "hyperUnique", value = HyperUniquesAggregatorFactory.class),
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class)
@JsonSubTypes.Type(name = "cardinality", value = CardinalityAggregatorFactory.class),
@JsonSubTypes.Type(name = "filtered", value = FilteredAggregatorFactory.class)
})
public static interface AggregatorFactoryMixin
{

View File

@ -47,14 +47,14 @@ import java.util.concurrent.TimeoutException;
/**
* A QueryRunner that combines a list of other QueryRunners and executes them in parallel on an executor.
*
* <p/>
* When using this, it is important to make sure that the list of QueryRunners provided is fully flattened.
* If, for example, you were to pass a list of a Chained QueryRunner (A) and a non-chained QueryRunner (B). Imagine
* A has 2 QueryRunner chained together (Aa and Ab), the fact that the Queryables are run in parallel on an
* executor would mean that the Queryables are actually processed in the order
*
* <p/>
* <pre>A -&gt; B -&gt; Aa -&gt; Ab</pre>
*
* <p/>
* That is, the two sub queryables for A would run *after* B is run, effectively meaning that the results for B
* must be fully cached in memory before the results for Aa and Ab are computed.
*/
@ -113,6 +113,10 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
@Override
public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
{
if (input == null) {
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
return exec.submit(
new AbstractPrioritizedCallable<Iterable<T>>(priority)
{
@ -120,10 +124,6 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
public Iterable<T> call() throws Exception
{
try {
if (input == null) {
throw new ISE("Input is null?! How is this possible?!");
}
Sequence<T> result = input.run(query);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
@ -155,7 +155,7 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
queryWatcher.registerQuery(query, futures);
try {
final Number timeout = query.getContextValue("timeout", (Number)null);
final Number timeout = query.getContextValue("timeout", (Number) null);
return new MergeIterable<>(
ordering.nullsFirst(),
timeout == null ?
@ -168,10 +168,10 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
}
catch(CancellationException e) {
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
}
catch(TimeoutException e) {
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");

View File

@ -29,6 +29,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.Sequence;
@ -81,9 +82,6 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
final boolean bySegment = query.getContextBySegment(false);
final int priority = query.getContextPriority(0);
if (Iterables.isEmpty(queryables)) {
log.warn("No queryables found.");
}
ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
@ -93,6 +91,10 @@ public class GroupByParallelQueryRunner<T> implements QueryRunner<T>
@Override
public ListenableFuture<Void> apply(final QueryRunner<T> input)
{
if (input == null) {
throw new ISE("Null queryRunner! Looks to be some segment unmapping action happening");
}
return exec.submit(
new AbstractPrioritizedCallable<Void>(priority)
{

View File

@ -0,0 +1,86 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
public class FilteredAggregator implements Aggregator
{
private final String name;
private final DimensionSelector dimSelector;
private final Aggregator delegate;
private final IntPredicate predicate;
public FilteredAggregator(String name, DimensionSelector dimSelector, IntPredicate predicate, Aggregator delegate)
{
this.name = name;
this.dimSelector = dimSelector;
this.delegate = delegate;
this.predicate = predicate;
}
@Override
public void aggregate()
{
final IndexedInts row = dimSelector.getRow();
final int size = row.size();
for (int i = 0; i < size; ++i) {
if (predicate.apply(row.get(i))) {
delegate.aggregate();
break;
}
}
}
@Override
public void reset()
{
delegate.reset();
}
@Override
public Object get()
{
return delegate.get();
}
@Override
public float getFloat()
{
return delegate.getFloat();
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
delegate.close();
}
}

View File

@ -0,0 +1,216 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
public class FilteredAggregatorFactory implements AggregatorFactory
{
private static final byte CACHE_TYPE_ID = 0x9;
private final String name;
private final AggregatorFactory delegate;
private final DimFilter filter;
public FilteredAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("aggregator") AggregatorFactory delegate,
@JsonProperty("filter") DimFilter filter
)
{
Preconditions.checkNotNull(delegate);
Preconditions.checkNotNull(filter);
Preconditions.checkArgument(
filter instanceof SelectorDimFilter ||
(filter instanceof NotDimFilter && ((NotDimFilter) filter).getField() instanceof SelectorDimFilter),
"FilteredAggregator currently only supports filters of type 'selector' and their negation"
);
this.name = name;
this.delegate = delegate;
this.filter = filter;
}
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
final Aggregator aggregator = delegate.factorize(metricFactory);
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
filter,
metricFactory
);
return new FilteredAggregator(name, selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
final BufferAggregator aggregator = delegate.factorizeBuffered(metricFactory);
final Pair<DimensionSelector, IntPredicate> selectorPredicatePair = makeFilterPredicate(
filter,
metricFactory
);
return new FilteredBufferAggregator(selectorPredicatePair.lhs, selectorPredicatePair.rhs, aggregator);
}
@Override
public Comparator getComparator()
{
return delegate.getComparator();
}
@Override
public Object combine(Object lhs, Object rhs)
{
return delegate.combine(lhs, rhs);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return delegate.getCombiningFactory();
}
@Override
public Object deserialize(Object object)
{
return delegate.deserialize(object);
}
@Override
public Object finalizeComputation(Object object)
{
return delegate.finalizeComputation(object);
}
@JsonProperty
@Override
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return delegate.requiredFields();
}
@Override
public byte[] getCacheKey()
{
byte[] filterCacheKey = filter.getCacheKey();
byte[] aggregatorCacheKey = delegate.getCacheKey();
return ByteBuffer.allocate(1 + filterCacheKey.length + aggregatorCacheKey.length)
.put(CACHE_TYPE_ID)
.put(filterCacheKey)
.put(aggregatorCacheKey)
.array();
}
@Override
public String getTypeName()
{
return delegate.getTypeName();
}
@Override
public int getMaxIntermediateSize()
{
return delegate.getMaxIntermediateSize();
}
@Override
public Object getAggregatorStartValue()
{
return delegate.getAggregatorStartValue();
}
@JsonProperty
public AggregatorFactory getAggregator()
{
return delegate;
}
@JsonProperty
public DimFilter getFilter()
{
return filter;
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return delegate.getRequiredColumns();
}
private static Pair<DimensionSelector, IntPredicate> makeFilterPredicate(
final DimFilter dimFilter,
final ColumnSelectorFactory metricFactory
)
{
final SelectorDimFilter selector;
if (dimFilter instanceof NotDimFilter) {
// we only support NotDimFilter with Selector filter
selector = (SelectorDimFilter) ((NotDimFilter) dimFilter).getField();
} else if (dimFilter instanceof SelectorDimFilter) {
selector = (SelectorDimFilter) dimFilter;
} else {
throw new ISE("Unsupported DimFilter type [%d]", dimFilter.getClass());
}
final DimensionSelector dimSelector = metricFactory.makeDimensionSelector(selector.getDimension());
final int lookupId = dimSelector.lookupId(selector.getValue());
final IntPredicate predicate;
if (dimFilter instanceof NotDimFilter) {
predicate = new IntPredicate()
{
@Override
public boolean apply(int value)
{
return lookupId != value;
}
};
} else {
predicate = new IntPredicate()
{
@Override
public boolean apply(int value)
{
return lookupId == value;
}
};
}
return Pair.of(dimSelector, predicate);
}
}

View File

@ -0,0 +1,79 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
public class FilteredBufferAggregator implements BufferAggregator
{
private final DimensionSelector dimSelector;
private final IntPredicate predicate;
private final BufferAggregator delegate;
public FilteredBufferAggregator(DimensionSelector dimSelector, IntPredicate predicate, BufferAggregator delegate)
{
this.dimSelector = dimSelector;
this.predicate = predicate;
this.delegate = delegate;
}
@Override
public void init(ByteBuffer buf, int position)
{
delegate.init(buf, position);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
final IndexedInts row = dimSelector.getRow();
final int size = row.size();
for (int i = 0; i < size; ++i) {
if (predicate.apply(row.get(i))) {
delegate.aggregate(buf, position);
break;
}
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
return delegate.get(buf, position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return delegate.getFloat(buf, position);
}
@Override
public void close()
{
delegate.close();
}
}

View File

@ -0,0 +1,29 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
/**
* can be replaced with http://docs.oracle.com/javase/8/docs/api/java/util/function/IntPredicate.html
* when druid moves to java 8.
*/
public interface IntPredicate
{
boolean apply(int value);
}

View File

@ -0,0 +1,184 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.aggregation;
import io.druid.query.filter.NotDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.TimestampColumnSelector;
import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.IndexedInts;
import org.junit.Assert;
import org.junit.Test;
public class FilteredAggregatorTest
{
private void aggregate(TestFloatColumnSelector selector, FilteredAggregator agg)
{
agg.aggregate();
selector.increment();
}
@Test
public void testAggregate()
{
final float[] values = {0.15f, 0.27f};
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
"test",
new DoubleSumAggregatorFactory("billy", "value"),
new SelectorDimFilter("dim", "a")
);
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
makeColumnSelector(selector)
);
Assert.assertEquals("test", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
double expectedThird = expectedSecond;
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
}
private ColumnSelectorFactory makeColumnSelector(final TestFloatColumnSelector selector){
return new ColumnSelectorFactory()
{
@Override
public TimestampColumnSelector makeTimestampColumnSelector()
{
throw new UnsupportedOperationException();
}
@Override
public DimensionSelector makeDimensionSelector(String dimensionName)
{
if (dimensionName.equals("dim")) {
return new DimensionSelector()
{
@Override
public IndexedInts getRow()
{
if (selector.getIndex() % 3 == 2) {
return new ArrayBasedIndexedInts(new int[]{1});
} else {
return new ArrayBasedIndexedInts(new int[]{0});
}
}
@Override
public int getValueCardinality()
{
return 2;
}
@Override
public String lookupName(int id)
{
switch (id) {
case 0:
return "a";
case 1:
return "b";
default:
throw new IllegalArgumentException();
}
}
@Override
public int lookupId(String name)
{
switch (name) {
case "a":
return 0;
case "b":
return 1;
default:
throw new IllegalArgumentException();
}
}
};
} else {
throw new UnsupportedOperationException();
}
}
@Override
public FloatColumnSelector makeFloatColumnSelector(String columnName)
{
if (columnName.equals("value")) {
return selector;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public ObjectColumnSelector makeObjectColumnSelector(String columnName)
{
throw new UnsupportedOperationException();
}
};
}
private void assertValues(FilteredAggregator agg,TestFloatColumnSelector selector, double... expectedVals){
Assert.assertEquals(0.0d, agg.get());
Assert.assertEquals(0.0d, agg.get());
Assert.assertEquals(0.0d, agg.get());
for(double expectedVal : expectedVals){
aggregate(selector, agg);
Assert.assertEquals(expectedVal, agg.get());
Assert.assertEquals(expectedVal, agg.get());
Assert.assertEquals(expectedVal, agg.get());
}
}
@Test
public void testAggregateWithNotFilter()
{
final float[] values = {0.15f, 0.27f};
final TestFloatColumnSelector selector = new TestFloatColumnSelector(values);
FilteredAggregatorFactory factory = new FilteredAggregatorFactory(
"test",
new DoubleSumAggregatorFactory("billy", "value"),
new NotDimFilter(new SelectorDimFilter("dim", "b"))
);
FilteredAggregator agg = (FilteredAggregator) factory.factorize(
makeColumnSelector(selector)
);
Assert.assertEquals("test", agg.getName());
double expectedFirst = new Float(values[0]).doubleValue();
double expectedSecond = new Float(values[1]).doubleValue() + expectedFirst;
double expectedThird = expectedSecond;
assertValues(agg, selector, expectedFirst, expectedSecond, expectedThird);
}
}

View File

@ -44,4 +44,9 @@ public class TestFloatColumnSelector implements FloatColumnSelector
{
++index;
}
public int getIndex()
{
return index;
}
}

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-rabbitmq</artifactId>
@ -10,7 +9,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -28,7 +28,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>

View File

@ -27,7 +27,7 @@
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.6.160-SNAPSHOT</version>
<version>0.6.161-SNAPSHOT</version>
</parent>
<dependencies>