Merge pull request #350 from metamx/topN

Approximate TopN Query
This commit is contained in:
fjy 2014-01-08 15:58:52 -08:00
commit f8478d49d8
39 changed files with 5680 additions and 1 deletions

View File

@ -0,0 +1,45 @@
---
layout: doc_page
---
TopNMetricSpec
==================
The topN metric spec specifies how topN values should be sorted.
## Numeric TopNMetricSpec
The simplest metric specification is a String value indicating the metric to sort topN results by. They are included in a topN query with:
```json
"metric": <metric_value_string>
```
The metric field can also be given as a JSON object. The grammar for dimension values sorted by numeric value is shown below:
```json
"metric": {
"type": "numeric",
"metric": "<metric_value>"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|this indicates a numeric sort|yes|
|metric|the actual metric field in which results will be sorted by|yes|
## Lexicographic TopNMetricSpec
The grammar for dimension values sorted lexicographically is as follows:
```json
"metric": {
"type": "lexicographic",
"previousStop": "<previousStop_value>"
}
```
|property|description|required?|
|--------|-----------|---------|
|type|this indicates a lexicographic sort|yes|
|previousStop|the starting point of the lexicographic sort. For example, if a previousStop value is 'b', all values before 'b' are discarded. This field can be used to paginate through all the dimension values.|no|

119
docs/content/TopNQuery.md Normal file
View File

@ -0,0 +1,119 @@
---
layout: doc_page
---
TopN queries
==================
TopN queries return a sorted set of results for the values in a given dimension according to some criteria. Conceptually, they can be thought of as an approximate [GroupByQuery](GroupByQuery.html) over a single dimension with an [Ordering](Ordering.html) spec. TopNs are much faster and resource efficient than GroupBys for this use case. These types of queries take a topN query object and return an array of JSON objects where each object represents a value asked for by the topN query.
A topN query object looks like:
```json
"queryType": "topN",
"dataSource": "sample_data",
"dimension": "sample_dim",
"threshold": 5,
"metric": "count",
"granularity": "all",
"filter": {
"type": "and",
"fields": [
{
"type": "selector",
"dimension": "dim1",
"value": "some_value"
},
{
"type": "selector",
"dimension": "dim2",
"value": "some_other_val"
}
]
},
"aggregations": [
{
"type": "longSum",
"name": "count",
"fieldName": "count"
},
{
"type": "doubleSum",
"name": "some_metric",
"fieldName": "some_metric"
}
],
"postAggregations": [
{
"type": "arithmetic",
"name": "sample_divide",
"fn": "/",
"fields": [
{
"type": "fieldAccess",
"name": "some_metric",
"fieldName": "some_metric"
},
{
"type": "fieldAccess",
"name": "count",
"fieldName": "count"
}
]
}
],
"intervals": [
"2013-08-31T00:00:00.000/2013-09-03T00:00:00.000"
]
}
```
There are 10 parts to a topN query, but 7 of them are shared with [TimeseriesQuery](TimeseriesQuery.html). Please review [TimeseriesQuery](TimeseriesQuery.html) for meanings of fields not defined below.
|property|description|required?|
|--------|-----------|---------|
|dimension|A JSON object defining the dimension that you want the top taken for. For more info, see [DimensionSpecs](DimensionSpecs.html)|yes|
|threshold|An integer defining the N in the topN (i.e. how many you want in the top list)|yes|
|metric|A JSON object specifying the metric to sort by for the top list. For more info, see [TopNMetricSpec](TopNMetricSpec.html).|yes|
Please note the context JSON object is also available for topN queries and should be used with the same caution as the timeseries case.
The format of the results would look like so:
```json
[
{
"timestamp": "2013-08-31T00:00:00.000Z",
"result": [
{
"user": "67.173.175.77",
"count": 111,
"some_metrics": 10669,
"average": 96.11711711711712
},
{
"user": "24.10.49.170",
"count": 88,
"some_metrics": 28344,
"average": 322.09090909090907
},
{
"user": "72.193.24.148",
"count": 70,
"some_metrics": 871,
"average": 12.442857142857143
},
{
"user": "108.46.28.47",
"count": 62,
"some_metrics": 815,
"average": 13.14516129032258
},
{
"user": "99.181.143.133",
"count": 60,
"some_metrics": 2787,
"average": 46.45
}
]
}
]
```

View File

@ -42,6 +42,8 @@ h2. Querying
** "SegmentMetadataQuery":./SegmentMetadataQuery.html
** "TimeBoundaryQuery":./TimeBoundaryQuery.html
** "TimeseriesQuery":./TimeseriesQuery.html
** "TopNQuery":./TopNQuery.html
*** "TopNMetricSpec":./TopNMetricSpec.html
h2. Architecture
* "Design":./Design.html

View File

@ -28,6 +28,7 @@ import io.druid.query.search.search.SearchQuery;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.topn.TopNQuery;
import org.joda.time.Duration;
import org.joda.time.Interval;
@ -40,7 +41,8 @@ import java.util.Map;
@JsonSubTypes.Type(name = Query.SEARCH, value = SearchQuery.class),
@JsonSubTypes.Type(name = Query.TIME_BOUNDARY, value = TimeBoundaryQuery.class),
@JsonSubTypes.Type(name = Query.GROUP_BY, value = GroupByQuery.class),
@JsonSubTypes.Type(name = Query.SEGMENT_METADATA, value = SegmentMetadataQuery.class)
@JsonSubTypes.Type(name = Query.SEGMENT_METADATA, value = SegmentMetadataQuery.class),
@JsonSubTypes.Type(name = Query.TOPN, value = TopNQuery.class)
})
public interface Query<T>
{
@ -49,6 +51,7 @@ public interface Query<T>
public static final String TIME_BOUNDARY = "timeBoundary";
public static final String GROUP_BY = "groupBy";
public static final String SEGMENT_METADATA = "segmentMetadata";
public static final String TOPN = "topN";
public String getDataSource();

View File

@ -0,0 +1,174 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import io.druid.collections.StupidPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
/**
*/
public class AggregateTopNMetricFirstAlgorithm implements TopNAlgorithm<int[], TopNParams>
{
private final Capabilities capabilities;
private final TopNQuery query;
private final Comparator<?> comparator;
private final StupidPool<ByteBuffer> bufferPool;
public AggregateTopNMetricFirstAlgorithm(
Capabilities capabilities,
TopNQuery query,
StupidPool<ByteBuffer> bufferPool
)
{
this.capabilities = capabilities;
this.query = query;
this.comparator = query.getTopNMetricSpec()
.getComparator(query.getAggregatorSpecs(), query.getPostAggregatorSpecs());
this.bufferPool = bufferPool;
}
@Override
public TopNParams makeInitParams(
DimensionSelector dimSelector, Cursor cursor
)
{
return new TopNParams(dimSelector, cursor, dimSelector.getValueCardinality(), Integer.MAX_VALUE);
}
@Override
public TopNResultBuilder makeResultBuilder(TopNParams params)
{
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator
);
}
@Override
public void run(
TopNParams params, TopNResultBuilder resultBuilder, int[] ints
)
{
final TopNResultBuilder singleMetricResultBuilder = makeResultBuilder(params);
final String metric = ((NumericTopNMetricSpec) query.getTopNMetricSpec()).getMetric();
// Find either the aggregator or post aggregator to do the topN over
List<AggregatorFactory> condensedAggs = Lists.newArrayList();
for (AggregatorFactory aggregatorSpec : query.getAggregatorSpecs()) {
if (aggregatorSpec.getName().equalsIgnoreCase(metric)) {
condensedAggs.add(aggregatorSpec);
break;
}
}
List<PostAggregator> condensedPostAggs = Lists.newArrayList();
if (condensedAggs.isEmpty()) {
for (PostAggregator postAggregator : query.getPostAggregatorSpecs()) {
if (postAggregator.getName().equalsIgnoreCase(metric)) {
condensedPostAggs.add(postAggregator);
// Add all dependent metrics
for (AggregatorFactory aggregatorSpec : query.getAggregatorSpecs()) {
if (postAggregator.getDependentFields().contains(aggregatorSpec.getName())) {
condensedAggs.add(aggregatorSpec);
}
}
break;
}
}
}
if (condensedAggs.isEmpty() && condensedPostAggs.isEmpty()) {
throw new ISE("WTF! Can't find the metric to do topN over?");
}
// Run topN for only a single metric
TopNQuery singleMetricQuery = new TopNQueryBuilder().copy(query)
.aggregators(condensedAggs)
.postAggregators(condensedPostAggs)
.build();
PooledTopNAlgorithm singleMetricAlgo = new PooledTopNAlgorithm(capabilities, singleMetricQuery, bufferPool);
PooledTopNAlgorithm.PooledTopNParams singleMetricParam = null;
int[] dimValSelector = null;
try {
singleMetricParam = singleMetricAlgo.makeInitParams(params.getDimSelector(), params.getCursor());
singleMetricAlgo.run(
singleMetricParam,
singleMetricResultBuilder,
null
);
// Get only the topN dimension values
dimValSelector = getDimValSelectorForTopNMetric(singleMetricParam, singleMetricResultBuilder);
}
finally {
if (singleMetricParam != null) {
singleMetricAlgo.cleanup(singleMetricParam);
}
}
PooledTopNAlgorithm allMetricAlgo = new PooledTopNAlgorithm(capabilities, query, bufferPool);
PooledTopNAlgorithm.PooledTopNParams allMetricsParam = null;
try {
// Run topN for all metrics for top N dimension values
allMetricsParam = allMetricAlgo.makeInitParams(params.getDimSelector(), params.getCursor());
allMetricAlgo.run(
allMetricsParam,
resultBuilder,
dimValSelector
);
}
finally {
if (allMetricsParam != null) {
allMetricAlgo.cleanup(allMetricsParam);
}
}
}
@Override
public void cleanup(TopNParams params)
{
}
private int[] getDimValSelectorForTopNMetric(TopNParams params, TopNResultBuilder resultBuilder)
{
int[] dimValSelector = new int[params.getDimSelector().getValueCardinality()];
Arrays.fill(dimValSelector, SKIP_POSITION_VALUE);
Iterator<DimValHolder> dimValIter = resultBuilder.getTopNIterator();
while (dimValIter.hasNext()) {
int dimValIndex = (Integer) dimValIter.next().getDimValIndex();
dimValSelector[dimValIndex] = INIT_POSITION_VALUE;
}
return dimValSelector;
}
}

View File

@ -0,0 +1,234 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.metamx.common.Pair;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import java.util.Arrays;
import java.util.List;
/**
*/
public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Parameters extends TopNParams>
implements TopNAlgorithm<DimValSelector, Parameters>
{
protected static Aggregator[] makeAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
{
Aggregator[] aggregators = new Aggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
for (AggregatorFactory spec : aggregatorSpecs) {
aggregators[aggregatorIndex] = spec.factorize(cursor);
++aggregatorIndex;
}
return aggregators;
}
protected static BufferAggregator[] makeBufferAggregators(Cursor cursor, List<AggregatorFactory> aggregatorSpecs)
{
BufferAggregator[] aggregators = new BufferAggregator[aggregatorSpecs.size()];
int aggregatorIndex = 0;
for (AggregatorFactory spec : aggregatorSpecs) {
aggregators[aggregatorIndex] = spec.factorizeBuffered(cursor);
++aggregatorIndex;
}
return aggregators;
}
private final Capabilities capabilities;
protected BaseTopNAlgorithm(Capabilities capabilities)
{
this.capabilities = capabilities;
}
@Override
public void run(
Parameters params,
TopNResultBuilder resultBuilder,
DimValSelector dimValSelector
)
{
boolean hasDimValSelector = (dimValSelector != null);
final int cardinality = params.getCardinality();
int numProcessed = 0;
while (numProcessed < cardinality) {
final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
params.getCursor().reset();
DimValSelector theDimValSelector;
if (!hasDimValSelector) {
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
} else {
theDimValSelector = updateDimValSelector(dimValSelector, numProcessed, numToProcess);
}
DimValAggregateStore aggregatesStore = makeDimValAggregateStore(params);
scanAndAggregate(params, theDimValSelector, aggregatesStore, numProcessed);
updateResults(params, theDimValSelector, aggregatesStore, resultBuilder);
closeAggregators(aggregatesStore);
numProcessed += numToProcess;
}
}
protected abstract DimValSelector makeDimValSelector(Parameters params, int numProcessed, int numToProcess);
protected abstract DimValSelector updateDimValSelector(
DimValSelector dimValSelector,
int numProcessed,
int numToProcess
);
protected abstract DimValAggregateStore makeDimValAggregateStore(Parameters params);
protected abstract void scanAndAggregate(
Parameters params,
DimValSelector dimValSelector,
DimValAggregateStore dimValAggregateStore,
int numProcessed
);
protected abstract void updateResults(
Parameters params,
DimValSelector dimValSelector,
DimValAggregateStore dimValAggregateStore,
TopNResultBuilder resultBuilder
);
protected abstract void closeAggregators(
DimValAggregateStore dimValAggregateStore
);
protected class AggregatorArrayProvider extends BaseArrayProvider<Aggregator[][]>
{
Aggregator[][] expansionAggs;
int cardinality;
public AggregatorArrayProvider(DimensionSelector dimSelector, TopNQuery query, int cardinality)
{
super(dimSelector, query, capabilities);
this.expansionAggs = new Aggregator[cardinality][];
this.cardinality = cardinality;
}
@Override
public Aggregator[][] build()
{
Pair<Integer, Integer> startEnd = computeStartEnd(cardinality);
Arrays.fill(expansionAggs, 0, startEnd.lhs, EMPTY_ARRAY);
Arrays.fill(expansionAggs, startEnd.lhs, startEnd.rhs, null);
Arrays.fill(expansionAggs, startEnd.rhs, expansionAggs.length, EMPTY_ARRAY);
return expansionAggs;
}
}
protected static abstract class BaseArrayProvider<T> implements TopNMetricSpecBuilder<T>
{
private volatile String previousStop;
private volatile boolean ignoreAfterThreshold;
private volatile int ignoreFirstN;
private volatile int keepOnlyN;
private final DimensionSelector dimSelector;
private final TopNQuery query;
private final Capabilities capabilities;
public BaseArrayProvider(
DimensionSelector dimSelector,
TopNQuery query,
Capabilities capabilities
)
{
this.dimSelector = dimSelector;
this.query = query;
this.capabilities = capabilities;
previousStop = null;
ignoreAfterThreshold = false;
ignoreFirstN = 0;
keepOnlyN = dimSelector.getValueCardinality();
}
@Override
public void skipTo(String previousStop)
{
if (capabilities.dimensionValuesSorted()) {
this.previousStop = previousStop;
}
}
@Override
public void ignoreAfterThreshold()
{
ignoreAfterThreshold = true;
}
@Override
public void ignoreFirstN(int n)
{
ignoreFirstN = n;
}
@Override
public void keepOnlyN(int n)
{
keepOnlyN = n;
}
protected Pair<Integer, Integer> computeStartEnd(int cardinality)
{
int startIndex = ignoreFirstN;
if (previousStop != null) {
int lookupId = dimSelector.lookupId(previousStop) + 1;
if (lookupId < 0) {
lookupId *= -1;
}
if (lookupId > ignoreFirstN + keepOnlyN) {
startIndex = ignoreFirstN + keepOnlyN;
} else {
startIndex = Math.max(lookupId, startIndex);
}
}
int endIndex = Math.min(ignoreFirstN + keepOnlyN, cardinality);
if (ignoreAfterThreshold && query.getDimensionsFilter() == null) {
endIndex = Math.min(endIndex, startIndex + query.getThreshold());
}
return Pair.of(startIndex, endIndex);
}
}
}

View File

@ -0,0 +1,90 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import io.druid.query.BySegmentResultValue;
import io.druid.query.Result;
import java.util.List;
/**
*/
public class BySegmentTopNResultValue extends TopNResultValue implements BySegmentResultValue<TopNResultValue>
{
private final List<Result<TopNResultValue>> results;
private final String segmentId;
private final String intervalString;
@JsonCreator
public BySegmentTopNResultValue(
@JsonProperty("results") List<Result<TopNResultValue>> results,
@JsonProperty("segment") String segmentId,
@JsonProperty("interval") String intervalString
)
{
super(null);
this.results = results;
this.segmentId = segmentId;
this.intervalString = intervalString;
}
@Override
@JsonValue(false)
public List<DimensionAndMetricValueExtractor> getValue()
{
throw new UnsupportedOperationException();
}
@Override
@JsonProperty("results")
public List<Result<TopNResultValue>> getResults()
{
return results;
}
@Override
@JsonProperty("segment")
public String getSegmentId()
{
return segmentId;
}
@Override
@JsonProperty("interval")
public String getIntervalString()
{
return intervalString;
}
@Override
public String toString()
{
return "BySegmentTopNResultValue{" +
"results=" + results +
", segmentId='" + segmentId + '\'' +
", intervalString='" + intervalString + '\'' +
'}';
}
}

View File

@ -0,0 +1,169 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.collect.Maps;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.util.Comparator;
import java.util.Map;
/**
*/
public class DimExtractionTopNAlgorithm extends BaseTopNAlgorithm<Aggregator[][], Map<String, Aggregator[]>, TopNParams>
{
private final TopNQuery query;
private final Comparator<?> comparator;
public DimExtractionTopNAlgorithm(
Capabilities capabilities,
TopNQuery query
)
{
super(capabilities);
this.query = query;
this.comparator = query.getTopNMetricSpec()
.getComparator(query.getAggregatorSpecs(), query.getPostAggregatorSpecs());
}
@Override
public TopNParams makeInitParams(
final DimensionSelector dimSelector, final Cursor cursor
)
{
return new TopNParams(dimSelector, cursor, dimSelector.getValueCardinality(), Integer.MAX_VALUE);
}
@Override
public TopNResultBuilder makeResultBuilder(TopNParams params)
{
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator
);
}
@Override
protected Aggregator[][] makeDimValSelector(TopNParams params, int numProcessed, int numToProcess)
{
return query.getTopNMetricSpec().configureOptimizer(
new AggregatorArrayProvider(params.getDimSelector(), query, params.getCardinality())
).build();
}
@Override
protected Aggregator[][] updateDimValSelector(Aggregator[][] aggregators, int numProcessed, int numToProcess)
{
return aggregators;
}
@Override
protected Map<String, Aggregator[]> makeDimValAggregateStore(TopNParams params)
{
return Maps.newHashMap();
}
@Override
public void scanAndAggregate(
TopNParams params,
Aggregator[][] rowSelector,
Map<String, Aggregator[]> aggregatesStore,
int numProcessed
)
{
final Cursor cursor = params.getCursor();
final DimensionSelector dimSelector = params.getDimSelector();
while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow();
for (int i = 0; i < dimValues.size(); ++i) {
final int dimIndex = dimValues.get(i);
Aggregator[] theAggregators = rowSelector[dimIndex];
if (theAggregators == null) {
String key = query.getDimensionSpec().getDimExtractionFn().apply(dimSelector.lookupName(dimIndex));
if (key == null) {
rowSelector[dimIndex] = EMPTY_ARRAY;
continue;
}
theAggregators = aggregatesStore.get(key);
if (theAggregators == null) {
theAggregators = makeAggregators(cursor, query.getAggregatorSpecs());
aggregatesStore.put(key, theAggregators);
}
rowSelector[dimIndex] = theAggregators;
}
for (Aggregator aggregator : theAggregators) {
aggregator.aggregate();
}
}
cursor.advance();
}
}
@Override
protected void updateResults(
TopNParams params,
Aggregator[][] rowSelector,
Map<String, Aggregator[]> aggregatesStore,
TopNResultBuilder resultBuilder
)
{
for (Map.Entry<String, Aggregator[]> entry : aggregatesStore.entrySet()) {
Aggregator[] aggs = entry.getValue();
if (aggs != null && aggs.length > 0) {
Object[] vals = new Object[aggs.length];
for (int i = 0; i < aggs.length; i++) {
vals[i] = aggs[i].get();
}
resultBuilder.addEntry(
entry.getKey(),
entry.getKey(),
vals,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
);
}
}
}
@Override
protected void closeAggregators(Map<String, Aggregator[]> stringMap)
{
for (Aggregator[] aggregators : stringMap.values()) {
for (Aggregator agg : aggregators) {
agg.close();
}
}
}
@Override
public void cleanup(TopNParams params)
{
}
}

View File

@ -0,0 +1,110 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import java.util.Map;
/**
*/
public class DimValHolder
{
private final Object topNMetricVal;
private final String dimName;
private final Object dimValIndex;
private final Map<String, Object> metricValues;
public DimValHolder(
Object topNMetricVal,
String dimName,
Object dimValIndex,
Map<String, Object> metricValues
)
{
this.topNMetricVal = topNMetricVal;
this.dimName = dimName;
this.dimValIndex = dimValIndex;
this.metricValues = metricValues;
}
public Object getTopNMetricVal()
{
return topNMetricVal;
}
public String getDimName()
{
return dimName;
}
public Object getDimValIndex()
{
return dimValIndex;
}
public Map<String, Object> getMetricValues()
{
return metricValues;
}
public static class Builder
{
private Object topNMetricVal;
private String dirName;
private Object dimValIndex;
private Map<String, Object> metricValues;
public Builder()
{
topNMetricVal = null;
dirName = null;
dimValIndex = null;
metricValues = null;
}
public Builder withTopNMetricVal(Object topNMetricVal)
{
this.topNMetricVal = topNMetricVal;
return this;
}
public Builder withDirName(String dirName)
{
this.dirName = dirName;
return this;
}
public Builder withDimValIndex(Object dimValIndex)
{
this.dimValIndex = dimValIndex;
return this;
}
public Builder withMetricValues(Map<String, Object> metricValues)
{
this.metricValues = metricValues;
return this;
}
public DimValHolder build()
{
return new DimValHolder(topNMetricVal, dirName, dimValIndex, metricValues);
}
}
}

View File

@ -0,0 +1,83 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import io.druid.query.MetricValueExtractor;
import java.util.Map;
/**
*/
public class DimensionAndMetricValueExtractor extends MetricValueExtractor
{
private final Map<String, Object> value;
@JsonCreator
public DimensionAndMetricValueExtractor(Map<String, Object> value)
{
super(value);
this.value = value;
}
public String getStringDimensionValue(String dimension)
{
return (String) value.get(dimension);
}
public Object getDimensionValue(String dimension)
{
return value.get(dimension);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DimensionAndMetricValueExtractor that = (DimensionAndMetricValueExtractor) o;
if (value != null ? !value.equals(that.value) : that.value != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return value != null ? value.hashCode() : 0;
}
@Override
public String toString()
{
return "DimensionAndMetricValueExtractor{" +
"value=" + value +
'}';
}
}

View File

@ -0,0 +1,104 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.common.guava.Comparators;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
/**
*/
public class InvertedTopNMetricSpec implements TopNMetricSpec
{
private static final byte CACHE_TYPE_ID = 0x3;
private final TopNMetricSpec delegate;
@JsonCreator
public InvertedTopNMetricSpec(
@JsonProperty("metric") TopNMetricSpec delegate
)
{
this.delegate = delegate;
}
@Override
public void verifyPreconditions(
List<AggregatorFactory> aggregatorSpecs,
List<PostAggregator> postAggregatorSpecs
)
{
delegate.verifyPreconditions(aggregatorSpecs, postAggregatorSpecs);
}
@JsonProperty("metric")
public TopNMetricSpec getDelegate()
{
return delegate;
}
@Override
public Comparator getComparator(
List<AggregatorFactory> aggregatorSpecs,
List<PostAggregator> postAggregatorSpecs
)
{
return Comparators.inverse(delegate.getComparator(aggregatorSpecs, postAggregatorSpecs));
}
@Override
public TopNResultBuilder getResultBuilder(
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
)
{
return delegate.getResultBuilder(timestamp, dimSpec, threshold, comparator);
}
@Override
public byte[] getCacheKey()
{
final byte[] cacheKey = delegate.getCacheKey();
return ByteBuffer.allocate(1 + cacheKey.length).put(CACHE_TYPE_ID).put(cacheKey).array();
}
@Override
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder)
{
return delegate.configureOptimizer(builder);
}
@Override
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector)
{
delegate.initTopNAlgorithmSelector(selector);
}
}

View File

@ -0,0 +1,51 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.metamx.common.IAE;
import java.util.Map;
/**
*/
public class LegacyTopNMetricSpec extends NumericTopNMetricSpec
{
private static final String convertValue(Object metric)
{
final String retVal;
if (metric instanceof String) {
retVal = (String) metric;
} else if (metric instanceof Map) {
retVal = (String) ((Map) metric).get("metric");
} else {
throw new IAE("Unknown type[%s] for metric[%s]", metric.getClass(), metric);
}
return retVal;
}
@JsonCreator
public LegacyTopNMetricSpec(Object metric)
{
super(convertValue(metric));
}
}

View File

@ -0,0 +1,121 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.primitives.UnsignedBytes;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
/**
*/
public class LexicographicTopNMetricSpec implements TopNMetricSpec
{
private static final byte CACHE_TYPE_ID = 0x1;
private static Comparator<String> comparator = new Comparator<String>()
{
@Override
public int compare(String s, String s2)
{
return UnsignedBytes.lexicographicalComparator().compare(s.getBytes(Charsets.UTF_8), s2.getBytes(Charsets.UTF_8));
}
};
private final String previousStop;
@JsonCreator
public LexicographicTopNMetricSpec(
@JsonProperty("previousStop") String previousStop
)
{
this.previousStop = (previousStop == null) ? "" : previousStop;
}
@Override
public void verifyPreconditions(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs)
{
}
@JsonProperty
public String getPreviousStop()
{
return previousStop;
}
@Override
public Comparator getComparator(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs)
{
return comparator;
}
@Override
public TopNResultBuilder getResultBuilder(
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
)
{
return new TopNLexicographicResultBuilder(timestamp, dimSpec, threshold, previousStop, comparator);
}
@Override
public byte[] getCacheKey()
{
byte[] previousStopBytes = previousStop.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(1 + previousStopBytes.length)
.put(CACHE_TYPE_ID)
.put(previousStopBytes)
.array();
}
@Override
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder)
{
builder.skipTo(previousStop);
builder.ignoreAfterThreshold();
return builder;
}
@Override
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector)
{
selector.setAggregateAllMetrics(true);
}
@Override
public String toString()
{
return "LexicographicTopNMetricSpec{" +
"previousStop='" + previousStop + '\'' +
'}';
}
}

View File

@ -0,0 +1,160 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
/**
*/
public class NumericTopNMetricSpec implements TopNMetricSpec
{
private static final byte CACHE_TYPE_ID = 0x0;
private final String metric;
@JsonCreator
public NumericTopNMetricSpec(
@JsonProperty("metric") String metric
)
{
this.metric = metric;
}
@Override
public void verifyPreconditions(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs)
{
Preconditions.checkNotNull(metric, "metric can't be null");
Preconditions.checkNotNull(aggregatorSpecs, "aggregations cannot be null");
Preconditions.checkArgument(aggregatorSpecs.size() > 0, "Must have at least one AggregatorFactory");
final AggregatorFactory aggregator = Iterables.tryFind(
aggregatorSpecs,
new Predicate<AggregatorFactory>()
{
@Override
public boolean apply(AggregatorFactory input)
{
return input.getName().equals(metric);
}
}
).orNull();
final PostAggregator postAggregator = Iterables.tryFind(
postAggregatorSpecs,
new Predicate<PostAggregator>()
{
@Override
public boolean apply(PostAggregator input)
{
return input.getName().equals(metric);
}
}
).orNull();
Preconditions.checkArgument(
aggregator != null || postAggregator != null,
"Must have an AggregatorFactory or PostAggregator for metric[%s], gave[%s] and [%s]",
metric,
aggregatorSpecs,
postAggregatorSpecs
);
}
@JsonProperty
public String getMetric()
{
return metric;
}
@Override
public Comparator getComparator(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs)
{
Comparator comp = null;
for (AggregatorFactory factory : aggregatorSpecs) {
if (metric.equals(factory.getName())) {
comp = factory.getComparator();
break;
}
}
for (PostAggregator pf : postAggregatorSpecs) {
if (metric.equals(pf.getName())) {
comp = pf.getComparator();
break;
}
}
return comp;
}
@Override
public TopNResultBuilder getResultBuilder(
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
)
{
return new TopNNumericResultBuilder(timestamp, dimSpec, metric, threshold, comparator);
}
@Override
public byte[] getCacheKey()
{
byte[] metricBytes = metric.getBytes(Charsets.UTF_8);
return ByteBuffer.allocate(1 + metricBytes.length)
.put(CACHE_TYPE_ID)
.put(metricBytes)
.array();
}
@Override
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder)
{
return builder;
}
@Override
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector)
{
selector.setAggregateTopNMetricFirst(true);
}
@Override
public String toString()
{
return "NumericTopNMetricSpec{" +
"metric='" + metric + '\'' +
'}';
}
}

View File

@ -0,0 +1,401 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.io.Closeables;
import com.metamx.common.Pair;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidPool;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
/**
*/
public class PooledTopNAlgorithm extends BaseTopNAlgorithm<int[], BufferAggregator[], PooledTopNAlgorithm.PooledTopNParams>
{
private final Capabilities capabilities;
private final TopNQuery query;
private final Comparator<?> comparator;
private final StupidPool<ByteBuffer> bufferPool;
public PooledTopNAlgorithm(
Capabilities capabilities,
TopNQuery query,
StupidPool<ByteBuffer> bufferPool
)
{
super(capabilities);
this.capabilities = capabilities;
this.query = query;
this.comparator = query.getTopNMetricSpec()
.getComparator(query.getAggregatorSpecs(), query.getPostAggregatorSpecs());
this.bufferPool = bufferPool;
}
@Override
public PooledTopNParams makeInitParams(
DimensionSelector dimSelector, Cursor cursor
)
{
ResourceHolder<ByteBuffer> resultsBufHolder = bufferPool.take();
ByteBuffer resultsBuf = resultsBufHolder.get();
resultsBuf.clear();
final int cardinality = dimSelector.getValueCardinality();
final TopNMetricSpecBuilder<int[]> arrayProvider = new BaseArrayProvider<int[]>(
dimSelector,
query,
capabilities
)
{
private final int[] positions = new int[cardinality];
@Override
public int[] build()
{
Pair<Integer, Integer> startEnd = computeStartEnd(cardinality);
Arrays.fill(positions, 0, startEnd.lhs, SKIP_POSITION_VALUE);
Arrays.fill(positions, startEnd.lhs, startEnd.rhs, INIT_POSITION_VALUE);
Arrays.fill(positions, startEnd.rhs, positions.length, SKIP_POSITION_VALUE);
return positions;
}
};
final int numBytesToWorkWith = resultsBuf.remaining();
final int[] aggregatorSizes = new int[query.getAggregatorSpecs().size()];
int numBytesPerRecord = 0;
for (int i = 0; i < query.getAggregatorSpecs().size(); ++i) {
aggregatorSizes[i] = query.getAggregatorSpecs().get(i).getMaxIntermediateSize();
numBytesPerRecord += aggregatorSizes[i];
}
final int numValuesPerPass = numBytesToWorkWith / numBytesPerRecord;
return PooledTopNParams.builder()
.withDimSelector(dimSelector)
.withCursor(cursor)
.withCardinality(cardinality)
.withResultsBufHolder(resultsBufHolder)
.withResultsBuf(resultsBuf)
.withArrayProvider(arrayProvider)
.withNumBytesPerRecord(numBytesPerRecord)
.withNumValuesPerPass(numValuesPerPass)
.withAggregatorSizes(aggregatorSizes)
.build();
}
@Override
public TopNResultBuilder makeResultBuilder(PooledTopNParams params)
{
return query.getTopNMetricSpec().getResultBuilder(
params.getCursor().getTime(), query.getDimensionSpec(), query.getThreshold(), comparator
);
}
@Override
protected int[] makeDimValSelector(PooledTopNParams params, int numProcessed, int numToProcess)
{
final TopNMetricSpecBuilder<int[]> arrayProvider = params.getArrayProvider();
arrayProvider.ignoreFirstN(numProcessed);
arrayProvider.keepOnlyN(numToProcess);
return query.getTopNMetricSpec().configureOptimizer(arrayProvider).build();
}
protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int numToProcess)
{
final int[] retVal = Arrays.copyOf(dimValSelector, dimValSelector.length);
final int validEnd = Math.min(retVal.length, numProcessed + numToProcess);
final int end = Math.max(retVal.length, validEnd);
Arrays.fill(retVal, 0, numProcessed, SKIP_POSITION_VALUE);
Arrays.fill(retVal, validEnd, end, SKIP_POSITION_VALUE);
return retVal;
}
@Override
protected BufferAggregator[] makeDimValAggregateStore(PooledTopNParams params)
{
return makeBufferAggregators(params.getCursor(), query.getAggregatorSpecs());
}
@Override
protected void scanAndAggregate(
PooledTopNParams params,
int[] positions,
BufferAggregator[] theAggregators,
int numProcessed
)
{
final ByteBuffer resultsBuf = params.getResultsBuf();
final int numBytesPerRecord = params.getNumBytesPerRecord();
final int[] aggregatorSizes = params.getAggregatorSizes();
final Cursor cursor = params.getCursor();
final DimensionSelector dimSelector = params.getDimSelector();
while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow();
for (int i = 0; i < dimValues.size(); ++i) {
final int dimIndex = dimValues.get(i);
int position = positions[dimIndex];
switch (position) {
case SKIP_POSITION_VALUE:
break;
case INIT_POSITION_VALUE:
positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord;
position = positions[dimIndex];
for (int j = 0; j < theAggregators.length; ++j) {
theAggregators[j].init(resultsBuf, position);
position += aggregatorSizes[j];
}
position = positions[dimIndex];
default:
for (int j = 0; j < theAggregators.length; ++j) {
theAggregators[j].aggregate(resultsBuf, position);
position += aggregatorSizes[j];
}
}
}
cursor.advance();
}
}
@Override
protected void updateResults(
PooledTopNParams params,
int[] positions,
BufferAggregator[] theAggregators,
TopNResultBuilder resultBuilder
)
{
final ByteBuffer resultsBuf = params.getResultsBuf();
final int[] aggregatorSizes = params.getAggregatorSizes();
final DimensionSelector dimSelector = params.getDimSelector();
for (int i = 0; i < positions.length; i++) {
int position = positions[i];
if (position >= 0) {
Object[] vals = new Object[theAggregators.length];
for (int j = 0; j < theAggregators.length; j++) {
vals[j] = theAggregators[j].get(resultsBuf, position);
position += aggregatorSizes[j];
}
resultBuilder.addEntry(
dimSelector.lookupName(i),
i,
vals,
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
);
}
}
}
@Override
protected void closeAggregators(BufferAggregator[] bufferAggregators)
{
for(BufferAggregator agg : bufferAggregators) {
agg.close();
}
}
@Override
public void cleanup(PooledTopNParams params)
{
ResourceHolder<ByteBuffer> resultsBufHolder = params.getResultsBufHolder();
if (resultsBufHolder != null) {
resultsBufHolder.get().clear();
}
Closeables.closeQuietly(resultsBufHolder);
}
public static class PooledTopNParams extends TopNParams
{
public static Builder builder()
{
return new Builder();
}
private final ResourceHolder<ByteBuffer> resultsBufHolder;
private final ByteBuffer resultsBuf;
private final int[] aggregatorSizes;
private final int numBytesPerRecord;
private final TopNMetricSpecBuilder<int[]> arrayProvider;
public PooledTopNParams(
DimensionSelector dimSelector,
Cursor cursor,
int cardinality,
ResourceHolder<ByteBuffer> resultsBufHolder,
ByteBuffer resultsBuf,
int[] aggregatorSizes,
int numBytesPerRecord,
int numValuesPerPass,
TopNMetricSpecBuilder<int[]> arrayProvider
)
{
super(dimSelector, cursor, cardinality, numValuesPerPass);
this.resultsBufHolder = resultsBufHolder;
this.resultsBuf = resultsBuf;
this.aggregatorSizes = aggregatorSizes;
this.numBytesPerRecord = numBytesPerRecord;
this.arrayProvider = arrayProvider;
}
public ResourceHolder<ByteBuffer> getResultsBufHolder()
{
return resultsBufHolder;
}
public ByteBuffer getResultsBuf()
{
return resultsBuf;
}
public int[] getAggregatorSizes()
{
return aggregatorSizes;
}
public int getNumBytesPerRecord()
{
return numBytesPerRecord;
}
public TopNMetricSpecBuilder<int[]> getArrayProvider()
{
return arrayProvider;
}
public static class Builder
{
private DimensionSelector dimSelector;
private Cursor cursor;
private int cardinality;
private ResourceHolder<ByteBuffer> resultsBufHolder;
private ByteBuffer resultsBuf;
private int[] aggregatorSizes;
private int numBytesPerRecord;
private int numValuesPerPass;
private TopNMetricSpecBuilder<int[]> arrayProvider;
public Builder()
{
dimSelector = null;
cursor = null;
cardinality = 0;
resultsBufHolder = null;
resultsBuf = null;
aggregatorSizes = null;
numBytesPerRecord = 0;
numValuesPerPass = 0;
arrayProvider = null;
}
public Builder withDimSelector(DimensionSelector dimSelector)
{
this.dimSelector = dimSelector;
return this;
}
public Builder withCursor(Cursor cursor)
{
this.cursor = cursor;
return this;
}
public Builder withCardinality(int cardinality)
{
this.cardinality = cardinality;
return this;
}
public Builder withResultsBufHolder(ResourceHolder<ByteBuffer> resultsBufHolder)
{
this.resultsBufHolder = resultsBufHolder;
return this;
}
public Builder withResultsBuf(ByteBuffer resultsBuf)
{
this.resultsBuf = resultsBuf;
return this;
}
public Builder withAggregatorSizes(int[] aggregatorSizes)
{
this.aggregatorSizes = aggregatorSizes;
return this;
}
public Builder withNumBytesPerRecord(int numBytesPerRecord)
{
this.numBytesPerRecord = numBytesPerRecord;
return this;
}
public Builder withNumValuesPerPass(int numValuesPerPass)
{
this.numValuesPerPass = numValuesPerPass;
return this;
}
public Builder withArrayProvider(TopNMetricSpecBuilder<int[]> arrayProvider)
{
this.arrayProvider = arrayProvider;
return this;
}
public PooledTopNParams build()
{
return new PooledTopNParams(
dimSelector,
cursor,
cardinality,
resultsBufHolder,
resultsBuf,
aggregatorSizes,
numBytesPerRecord,
numValuesPerPass,
arrayProvider
);
}
}
}
}

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
/**
*/
public interface TopNAlgorithm<DimValSelector, Parameters extends TopNParams>
{
public static final Aggregator[] EMPTY_ARRAY = {};
public static final int INIT_POSITION_VALUE = -1;
public static final int SKIP_POSITION_VALUE = -2;
public TopNParams makeInitParams(DimensionSelector dimSelector, Cursor cursor);
public TopNResultBuilder makeResultBuilder(Parameters params);
public void run(
Parameters params,
TopNResultBuilder resultBuilder,
DimValSelector dimValSelector
);
public void cleanup(Parameters params);
}

View File

@ -0,0 +1,72 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
/**
*/
public class TopNAlgorithmSelector
{
private final int cardinality;
private final int numBytesPerRecord;
private volatile boolean hasDimExtractionFn;
private volatile boolean aggregateAllMetrics;
private volatile boolean aggregateTopNMetricFirst;
public TopNAlgorithmSelector(int cardinality, int numBytesPerRecord)
{
this.cardinality = cardinality;
this.numBytesPerRecord = numBytesPerRecord;
}
public void setHasDimExtractionFn(boolean hasDimExtractionFn)
{
this.hasDimExtractionFn = hasDimExtractionFn;
}
public void setAggregateAllMetrics(boolean aggregateAllMetrics)
{
this.aggregateAllMetrics = aggregateAllMetrics;
}
public void setAggregateTopNMetricFirst(boolean aggregateTopNMetricFirst)
{
// These are just heuristics based on an analysis of where an inflection point may lie to switch
// between different algorithms
if (cardinality > 400000 && numBytesPerRecord > 100) {
this.aggregateTopNMetricFirst = aggregateTopNMetricFirst;
}
}
public boolean isHasDimExtractionFn()
{
return hasDimExtractionFn;
}
public boolean isAggregateAllMetrics()
{
return aggregateAllMetrics;
}
public boolean isAggregateTopNMetricFirst()
{
return aggregateTopNMetricFirst;
}
}

View File

@ -0,0 +1,126 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.metamx.common.guava.nary.BinaryFn;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<TopNResultValue>, Result<TopNResultValue>>
{
private final TopNResultMerger merger;
private final DimensionSpec dimSpec;
private final QueryGranularity gran;
private final String dimension;
private final TopNMetricSpec topNMetricSpec;
private final int threshold;
private final List<AggregatorFactory> aggregations;
private final List<PostAggregator> postAggregations;
private final Comparator comparator;
public TopNBinaryFn(
final TopNResultMerger merger,
final QueryGranularity granularity,
final DimensionSpec dimSpec,
final TopNMetricSpec topNMetricSpec,
final int threshold,
final List<AggregatorFactory> aggregatorSpecs,
final List<PostAggregator> postAggregatorSpecs
)
{
this.merger = merger;
this.dimSpec = dimSpec;
this.gran = granularity;
this.topNMetricSpec = topNMetricSpec;
this.threshold = threshold;
this.aggregations = aggregatorSpecs;
this.postAggregations = postAggregatorSpecs;
this.dimension = dimSpec.getOutputName();
this.comparator = topNMetricSpec.getComparator(aggregatorSpecs, postAggregatorSpecs);
}
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> arg1, Result<TopNResultValue> arg2)
{
if (arg1 == null) {
return merger.getResult(arg2, comparator);
}
if (arg2 == null) {
return merger.getResult(arg1, comparator);
}
Map<String, DimensionAndMetricValueExtractor> retVals = new LinkedHashMap<String, DimensionAndMetricValueExtractor>();
TopNResultValue arg1Vals = arg1.getValue();
TopNResultValue arg2Vals = arg2.getValue();
for (DimensionAndMetricValueExtractor arg1Val : arg1Vals) {
retVals.put(arg1Val.getStringDimensionValue(dimension), arg1Val);
}
for (DimensionAndMetricValueExtractor arg2Val : arg2Vals) {
final String dimensionValue = arg2Val.getStringDimensionValue(dimension);
DimensionAndMetricValueExtractor arg1Val = retVals.get(dimensionValue);
if (arg1Val != null) {
Map<String, Object> retVal = new LinkedHashMap<String, Object>();
retVal.put(dimension, dimensionValue);
for (AggregatorFactory factory : aggregations) {
final String metricName = factory.getName();
retVal.put(metricName, factory.combine(arg1Val.getMetric(metricName), arg2Val.getMetric(metricName)));
}
for (PostAggregator pf : postAggregations) {
retVal.put(pf.getName(), pf.compute(retVal));
}
retVals.put(dimensionValue, new DimensionAndMetricValueExtractor(retVal));
} else {
retVals.put(dimensionValue, arg2Val);
}
}
final DateTime timestamp;
if (gran instanceof AllGranularity) {
timestamp = arg1.getTimestamp();
} else {
timestamp = gran.toDateTime(gran.truncate(arg1.getTimestamp().getMillis()));
}
TopNResultBuilder bob = topNMetricSpec.getResultBuilder(timestamp, dimSpec, threshold, comparator);
for (DimensionAndMetricValueExtractor extractor : retVals.values()) {
bob.addEntry(extractor);
}
return bob.build();
}
}

View File

@ -0,0 +1,134 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNLexicographicResultBuilder implements TopNResultBuilder
{
private final DateTime timestamp;
private final DimensionSpec dimSpec;
private final String previousStop;
private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNLexicographicResultBuilder(
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
String previousStop,
final Comparator comparator
)
{
this.timestamp = timestamp;
this.dimSpec = dimSpec;
this.previousStop = previousStop;
instantiatePQueue(threshold, comparator);
}
@Override
public TopNResultBuilder addEntry(
String dimName,
Object dimValIndex,
Object[] metricVals,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
Map<String, Object> metricValues = Maps.newLinkedHashMap();
if (dimName.compareTo(previousStop) > 0) {
metricValues.put(dimSpec.getOutputName(), dimName);
Iterator<AggregatorFactory> aggsIter = aggFactories.iterator();
for (Object metricVal : metricVals) {
metricValues.put(aggsIter.next().getName(), metricVal);
}
for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
}
pQueue.add(new DimValHolder.Builder().withDirName(dimName).withMetricValues(metricValues).build());
}
return this;
}
@Override
public TopNResultBuilder addEntry(DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor)
{
pQueue.add(
new DimValHolder.Builder().withDirName(dimensionAndMetricValueExtractor.getStringDimensionValue(dimSpec.getOutputName()))
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
.build()
);
return this;
}
@Override
public Iterator<DimValHolder> getTopNIterator()
{
return pQueue.iterator();
}
@Override
public Result<TopNResultValue> build()
{
// Pull out top aggregated values
List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(pQueue.size());
while (!pQueue.isEmpty()) {
values.add(pQueue.remove().getMetricValues());
}
return new Result<TopNResultValue>(timestamp, new TopNResultValue(values));
}
private void instantiatePQueue(int threshold, final Comparator comparator)
{
this.pQueue = MinMaxPriorityQueue.orderedBy(
new Comparator<DimValHolder>()
{
@Override
public int compare(
DimValHolder o1,
DimValHolder o2
)
{
return comparator.compare(o1.getDimName(), o2.getDimName());
}
}
).maximumSize(threshold).create();
}
}

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.base.Function;
import io.druid.query.Result;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
public class TopNMapFn implements Function<Cursor, Result<TopNResultValue>>
{
private final TopNQuery query;
private final TopNAlgorithm topNAlgorithm;
public TopNMapFn(
TopNQuery query,
TopNAlgorithm topNAlgorithm
)
{
this.query = query;
this.topNAlgorithm = topNAlgorithm;
}
@Override
@SuppressWarnings("unchecked")
public Result<TopNResultValue> apply(Cursor cursor)
{
final DimensionSelector dimSelector = cursor.makeDimensionSelector(query.getDimensionSpec().getDimension());
if (dimSelector == null) {
return null;
}
TopNParams params = null;
try {
params = topNAlgorithm.makeInitParams(dimSelector, cursor);
TopNResultBuilder resultBuilder = topNAlgorithm.makeResultBuilder(params);
topNAlgorithm.run(params, resultBuilder, null);
return resultBuilder.build();
}
finally {
topNAlgorithm.cleanup(params);
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.util.Comparator;
import java.util.List;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = LegacyTopNMetricSpec.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "numeric", value = NumericTopNMetricSpec.class),
@JsonSubTypes.Type(name = "lexicographic", value = LexicographicTopNMetricSpec.class),
@JsonSubTypes.Type(name = "inverted", value = InvertedTopNMetricSpec.class)
})
public interface TopNMetricSpec
{
public void verifyPreconditions(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs);
public Comparator getComparator(List<AggregatorFactory> aggregatorSpecs, List<PostAggregator> postAggregatorSpecs);
public TopNResultBuilder getResultBuilder(
DateTime timestamp,
DimensionSpec dimSpec,
int threshold,
Comparator comparator
);
public byte[] getCacheKey();
public <T> TopNMetricSpecBuilder<T> configureOptimizer(TopNMetricSpecBuilder<T> builder);
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector);
}

View File

@ -0,0 +1,35 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
/**
*/
public interface TopNMetricSpecBuilder<T>
{
public void skipTo(String previousStop);
public void ignoreAfterThreshold();
public void ignoreFirstN(int n);
public void keepOnlyN(int n);
public T build();
}

View File

@ -0,0 +1,153 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNNumericResultBuilder implements TopNResultBuilder
{
private final DateTime timestamp;
private final DimensionSpec dimSpec;
private final String metricName;
private MinMaxPriorityQueue<DimValHolder> pQueue = null;
public TopNNumericResultBuilder(
DateTime timestamp,
DimensionSpec dimSpec,
String metricName,
int threshold,
final Comparator comparator
)
{
this.timestamp = timestamp;
this.dimSpec = dimSpec;
this.metricName = metricName;
instantiatePQueue(threshold, comparator);
}
@Override
public TopNResultBuilder addEntry(
String dimName,
Object dimValIndex,
Object[] metricVals,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
)
{
Map<String, Object> metricValues = Maps.newLinkedHashMap();
metricValues.put(dimSpec.getOutputName(), dimName);
Iterator<AggregatorFactory> aggFactoryIter = aggFactories.iterator();
for (Object metricVal : metricVals) {
metricValues.put(aggFactoryIter.next().getName(), metricVal);
}
for (PostAggregator postAgg : postAggs) {
metricValues.put(postAgg.getName(), postAgg.compute(metricValues));
}
Object topNMetricVal = metricValues.get(metricName);
pQueue.add(
new DimValHolder.Builder().withTopNMetricVal(topNMetricVal)
.withDirName(dimName)
.withDimValIndex(dimValIndex)
.withMetricValues(metricValues)
.build()
);
return this;
}
@Override
public TopNResultBuilder addEntry(DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor)
{
pQueue.add(
new DimValHolder.Builder().withTopNMetricVal(dimensionAndMetricValueExtractor.getDimensionValue(metricName))
.withDirName(dimSpec.getOutputName())
.withMetricValues(dimensionAndMetricValueExtractor.getBaseObject())
.build()
);
return this;
}
@Override
public Iterator<DimValHolder> getTopNIterator()
{
return pQueue.iterator();
}
@Override
public Result<TopNResultValue> build()
{
// Pull out top aggregated values
List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(pQueue.size());
while (!pQueue.isEmpty()) {
values.add(pQueue.remove().getMetricValues());
}
return new Result<TopNResultValue>(
timestamp,
new TopNResultValue(values)
);
}
private void instantiatePQueue(int threshold, final Comparator comparator)
{
this.pQueue = MinMaxPriorityQueue.orderedBy(
new Comparator<DimValHolder>()
{
@Override
public int compare(DimValHolder d1, DimValHolder d2)
{
int retVal = comparator.compare(d2.getTopNMetricVal(), d1.getTopNMetricVal());
if (retVal == 0) {
if (d1.getDimName() == null) {
retVal = -1;
} else if (d2.getDimName() == null) {
retVal = 1;
} else {
retVal = d1.getDimName().compareTo(d2.getDimName());
}
}
return retVal;
}
}
).maximumSize(threshold).create();
}
}

View File

@ -0,0 +1,61 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
/**
*/
public class TopNParams
{
private final DimensionSelector dimSelector;
private final Cursor cursor;
private final int cardinality;
private final int numValuesPerPass;
protected TopNParams(DimensionSelector dimSelector, Cursor cursor, int cardinality, int numValuesPerPass)
{
this.dimSelector = dimSelector;
this.cursor = cursor;
this.cardinality = cardinality;
this.numValuesPerPass = numValuesPerPass;
}
public DimensionSelector getDimSelector()
{
return dimSelector;
}
public Cursor getCursor()
{
return cursor;
}
public int getCardinality()
{
return cardinality;
}
public int getNumValuesPerPass()
{
return numValuesPerPass;
}
}

View File

@ -0,0 +1,211 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.druid.granularity.QueryGranularity;
import io.druid.query.BaseQuery;
import io.druid.query.Queries;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.spec.QuerySegmentSpec;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
{
public static final String TOPN = "topN";
private final DimensionSpec dimensionSpec;
private final TopNMetricSpec topNMetricSpec;
private final int threshold;
private final DimFilter dimFilter;
private final QueryGranularity granularity;
private final List<AggregatorFactory> aggregatorSpecs;
private final List<PostAggregator> postAggregatorSpecs;
@JsonCreator
public TopNQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("dimension") DimensionSpec dimensionSpec,
@JsonProperty("metric") TopNMetricSpec topNMetricSpec,
@JsonProperty("threshold") int threshold,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("aggregations") List<AggregatorFactory> aggregatorSpecs,
@JsonProperty("postAggregations") List<PostAggregator> postAggregatorSpecs,
@JsonProperty("context") Map<String, String> context
)
{
super(dataSource, querySegmentSpec, context);
this.dimensionSpec = dimensionSpec;
this.topNMetricSpec = topNMetricSpec;
this.threshold = threshold;
this.dimFilter = dimFilter;
this.granularity = granularity;
this.aggregatorSpecs = aggregatorSpecs;
this.postAggregatorSpecs = postAggregatorSpecs == null ? ImmutableList.<PostAggregator>of() : postAggregatorSpecs;
Preconditions.checkNotNull(dimensionSpec, "dimensionSpec can't be null");
Preconditions.checkNotNull(topNMetricSpec, "must specify a metric");
Preconditions.checkArgument(threshold != 0, "Threshold cannot be equal to 0.");
topNMetricSpec.verifyPreconditions(this.aggregatorSpecs, this.postAggregatorSpecs);
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
}
@Override
public boolean hasFilters()
{
return dimFilter != null;
}
@Override
public String getType()
{
return TOPN;
}
@JsonProperty("dimension")
public DimensionSpec getDimensionSpec()
{
return dimensionSpec;
}
@JsonProperty("metric")
public TopNMetricSpec getTopNMetricSpec()
{
return topNMetricSpec;
}
@JsonProperty("threshold")
public int getThreshold()
{
return threshold;
}
@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}
@JsonProperty
public QueryGranularity getGranularity()
{
return granularity;
}
@JsonProperty("aggregations")
public List<AggregatorFactory> getAggregatorSpecs()
{
return aggregatorSpecs;
}
@JsonProperty("postAggregations")
public List<PostAggregator> getPostAggregatorSpecs()
{
return postAggregatorSpecs;
}
public void initTopNAlgorithmSelector(TopNAlgorithmSelector selector)
{
if (dimensionSpec.getDimExtractionFn() != null) {
selector.setHasDimExtractionFn(true);
}
topNMetricSpec.initTopNAlgorithmSelector(selector);
}
public TopNQuery withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return new TopNQuery(
getDataSource(),
dimensionSpec,
topNMetricSpec,
threshold,
querySegmentSpec,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
public TopNQuery withThreshold(int threshold)
{
return new TopNQuery(
getDataSource(),
dimensionSpec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
getContext()
);
}
public TopNQuery withOverriddenContext(Map<String, String> contextOverrides)
{
return new TopNQuery(
getDataSource(),
dimensionSpec,
topNMetricSpec,
threshold,
getQuerySegmentSpec(),
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
computeOverridenContext(contextOverrides)
);
}
@Override
public String toString()
{
return "TopNQuery{" +
"dataSource='" + getDataSource() + '\'' +
", dimensionSpec=" + dimensionSpec +
", topNMetricSpec=" + topNMetricSpec +
", threshold=" + threshold +
", querySegmentSpec=" + getQuerySegmentSpec() +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +
", aggregatorSpecs=" + aggregatorSpecs +
", postAggregatorSpecs=" + postAggregatorSpecs +
'}';
}
}

View File

@ -0,0 +1,290 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.OrDimFilter;
import io.druid.query.filter.SelectorDimFilter;
import io.druid.query.spec.LegacySegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import org.joda.time.Interval;
import java.util.List;
import java.util.Map;
/**
* A Builder for TopNQuery.
* <p/>
* Required: dataSource(), intervals(), metric() and threshold() must be called before build()
* Additional requirement for numeric metric sorts: aggregators() must be called before build()
* <p/>
* Optional: filters(), granularity(), postAggregators() and context() can be called before build()
* <p/>
* Usage example:
* <pre><code>
* TopNQuery query = new TopNQueryBuilder()
* .dataSource("Example")
* .dimension("example_dim")
* .metric("example_metric")
* .threshold(100)
* .intervals("2012-01-01/2012-01-02")
* .build();
* </code></pre>
*
* @see io.druid.query.topn.TopNQuery
*/
public class TopNQueryBuilder
{
private String dataSource;
private DimensionSpec dimensionSpec;
private TopNMetricSpec topNMetricSpec;
private int threshold;
private QuerySegmentSpec querySegmentSpec;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<AggregatorFactory> aggregatorSpecs;
private List<PostAggregator> postAggregatorSpecs;
private Map<String, String> context;
public TopNQueryBuilder()
{
dataSource = "";
dimensionSpec = null;
topNMetricSpec = null;
threshold = 0;
querySegmentSpec = null;
dimFilter = null;
granularity = QueryGranularity.ALL;
aggregatorSpecs = Lists.newArrayList();
postAggregatorSpecs = Lists.newArrayList();
context = null;
}
public String getDataSource()
{
return dataSource;
}
public DimensionSpec getDimensionSpec()
{
return dimensionSpec;
}
public TopNMetricSpec getTopNMetricSpec()
{
return topNMetricSpec;
}
public int getThreshold()
{
return threshold;
}
public QuerySegmentSpec getQuerySegmentSpec()
{
return querySegmentSpec;
}
public DimFilter getDimFilter()
{
return dimFilter;
}
public QueryGranularity getGranularity()
{
return granularity;
}
public List<AggregatorFactory> getAggregatorSpecs()
{
return aggregatorSpecs;
}
public List<PostAggregator> getPostAggregatorSpecs()
{
return postAggregatorSpecs;
}
public Map<String, String> getContext()
{
return context;
}
public TopNQuery build()
{
return new TopNQuery(
dataSource,
dimensionSpec,
topNMetricSpec,
threshold,
querySegmentSpec,
dimFilter,
granularity,
aggregatorSpecs,
postAggregatorSpecs,
context
);
}
public TopNQueryBuilder copy(TopNQuery query)
{
return new TopNQueryBuilder()
.dataSource(query.getDataSource())
.dimension(query.getDimensionSpec())
.metric(query.getTopNMetricSpec())
.threshold(query.getThreshold())
.intervals(query.getIntervals())
.filters(query.getDimensionsFilter())
.granularity(query.getGranularity())
.aggregators(query.getAggregatorSpecs())
.postAggregators(query.getPostAggregatorSpecs())
.context(query.getContext());
}
public TopNQueryBuilder copy(TopNQueryBuilder builder)
{
return new TopNQueryBuilder()
.dataSource(builder.dataSource)
.dimension(builder.dimensionSpec)
.metric(builder.topNMetricSpec)
.threshold(builder.threshold)
.intervals(builder.querySegmentSpec)
.filters(builder.dimFilter)
.granularity(builder.granularity)
.aggregators(builder.aggregatorSpecs)
.postAggregators(builder.postAggregatorSpecs)
.context(builder.context);
}
public TopNQueryBuilder dataSource(String d)
{
dataSource = d;
return this;
}
public TopNQueryBuilder dimension(String d)
{
return dimension(d, null);
}
public TopNQueryBuilder dimension(String d, String outputName)
{
return dimension(new DefaultDimensionSpec(d, outputName));
}
public TopNQueryBuilder dimension(DimensionSpec d)
{
dimensionSpec = d;
return this;
}
public TopNQueryBuilder metric(String s)
{
return metric(new NumericTopNMetricSpec(s));
}
public TopNQueryBuilder metric(TopNMetricSpec t)
{
topNMetricSpec = t;
return this;
}
public TopNQueryBuilder threshold(int i)
{
threshold = i;
return this;
}
public TopNQueryBuilder intervals(QuerySegmentSpec q)
{
querySegmentSpec = q;
return this;
}
public TopNQueryBuilder intervals(String s)
{
querySegmentSpec = new LegacySegmentSpec(s);
return this;
}
public TopNQueryBuilder intervals(List<Interval> l)
{
querySegmentSpec = new LegacySegmentSpec(l);
return this;
}
public TopNQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value);
return this;
}
public TopNQueryBuilder filters(String dimensionName, String value, String... values)
{
List<DimFilter> fields = Lists.<DimFilter>newArrayList(new SelectorDimFilter(dimensionName, value));
for (String val : values) {
fields.add(new SelectorDimFilter(dimensionName, val));
}
dimFilter = new OrDimFilter(fields);
return this;
}
public TopNQueryBuilder filters(DimFilter f)
{
dimFilter = f;
return this;
}
public TopNQueryBuilder granularity(String g)
{
granularity = QueryGranularity.fromString(g);
return this;
}
public TopNQueryBuilder granularity(QueryGranularity g)
{
granularity = g;
return this;
}
public TopNQueryBuilder aggregators(List<AggregatorFactory> a)
{
aggregatorSpecs = a;
return this;
}
public TopNQueryBuilder postAggregators(List<PostAggregator> p)
{
postAggregatorSpecs = p;
return this;
}
public TopNQueryBuilder context(Map<String, String> c)
{
context = c;
return this;
}
}

View File

@ -0,0 +1,39 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.query.QueryConfig;
import javax.validation.constraints.Min;
/**
*/
public class TopNQueryConfig extends QueryConfig
{
@JsonProperty
@Min(1)
private int minTopNThreshold = 1000;
public int getMinTopNThreshold()
{
return minTopNThreshold;
}
}

View File

@ -0,0 +1,117 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.filter.Filter;
import io.druid.segment.Capabilities;
import io.druid.segment.Cursor;
import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters;
import org.joda.time.Interval;
import java.nio.ByteBuffer;
import java.util.List;
/**
*/
public class TopNQueryEngine
{
private static final Logger log = new Logger(TopNQueryEngine.class);
private final StupidPool<ByteBuffer> bufferPool;
public TopNQueryEngine(StupidPool<ByteBuffer> bufferPool)
{
this.bufferPool = bufferPool;
}
public Iterable<Result<TopNResultValue>> query(final TopNQuery query, final StorageAdapter adapter)
{
final List<Interval> queryIntervals = query.getQuerySegmentSpec().getIntervals();
final Filter filter = Filters.convertDimensionFilters(query.getDimensionsFilter());
final QueryGranularity granularity = query.getGranularity();
final Function<Cursor, Result<TopNResultValue>> mapFn = getMapFn(query, adapter);
Preconditions.checkArgument(
queryIntervals.size() == 1, "Can only handle a single interval, got[%s]", queryIntervals
);
if (mapFn == null) {
return Lists.newArrayList();
}
return FunctionalIterable
.create(adapter.makeCursors(filter, queryIntervals.get(0), granularity))
.transform(
new Function<Cursor, Cursor>()
{
@Override
public Cursor apply(Cursor input)
{
log.debug("Running over cursor[%s]", adapter.getInterval(), input.getTime());
return input;
}
}
)
.keep(mapFn);
}
private Function<Cursor, Result<TopNResultValue>> getMapFn(TopNQuery query, final StorageAdapter adapter)
{
if (adapter == null) {
log.warn(
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped. Returning empty results."
);
return null;
}
final Capabilities capabilities = adapter.getCapabilities();
final int cardinality = adapter.getDimensionCardinality(query.getDimensionSpec().getDimension());
int numBytesPerRecord = 0;
for (AggregatorFactory aggregatorFactory : query.getAggregatorSpecs()) {
numBytesPerRecord += aggregatorFactory.getMaxIntermediateSize();
}
final TopNAlgorithmSelector selector = new TopNAlgorithmSelector(cardinality, numBytesPerRecord);
query.initTopNAlgorithmSelector(selector);
TopNAlgorithm topNAlgorithm = null;
if (selector.isHasDimExtractionFn()) {
topNAlgorithm = new DimExtractionTopNAlgorithm(capabilities, query);
} else if (selector.isAggregateAllMetrics()) {
topNAlgorithm = new PooledTopNAlgorithm(capabilities, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst()) {
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(capabilities, query, bufferPool);
} else {
topNAlgorithm = new PooledTopNAlgorithm(capabilities, query, bufferPool);
}
return new TopNMapFn(query, topNAlgorithm);
}
}

View File

@ -0,0 +1,405 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.MergeSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.collections.OrderedMergeSequence;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.IntervalChunkingQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryCacheHelper;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.query.ResultGranularTimestampComparator;
import io.druid.query.ResultMergeQueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultValue>, TopNQuery>
{
private static final byte TOPN_QUERY = 0x1;
private static final Joiner COMMA_JOIN = Joiner.on(",");
private static final TypeReference<Result<TopNResultValue>> TYPE_REFERENCE = new TypeReference<Result<TopNResultValue>>(){};
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE = new TypeReference<Object>(){};
private final TopNQueryConfig config;
@Inject
public TopNQueryQueryToolChest(
TopNQueryConfig config
)
{
this.config = config;
}
@Override
public QueryRunner<Result<TopNResultValue>> mergeResults(QueryRunner<Result<TopNResultValue>> runner)
{
return new ResultMergeQueryRunner<Result<TopNResultValue>>(runner)
{
@Override
protected Ordering<Result<TopNResultValue>> makeOrdering(Query<Result<TopNResultValue>> query)
{
return Ordering.from(
new ResultGranularTimestampComparator<TopNResultValue>(
((TopNQuery) query).getGranularity()
)
);
}
@Override
protected BinaryFn<Result<TopNResultValue>, Result<TopNResultValue>, Result<TopNResultValue>> createMergeFn(
Query<Result<TopNResultValue>> input
)
{
TopNQuery query = (TopNQuery) input;
return new TopNBinaryFn(
TopNResultMerger.identity,
query.getGranularity(),
query.getDimensionSpec(),
query.getTopNMetricSpec(),
query.getThreshold(),
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
);
}
};
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
{
return new OrderedMergeSequence<Result<TopNResultValue>>(getOrdering(), seqOfSequences);
}
@Override
public ServiceMetricEvent.Builder makeMetricBuilder(TopNQuery query)
{
int numMinutes = 0;
for (Interval interval : query.getIntervals()) {
numMinutes += Minutes.minutesIn(interval).getMinutes();
}
return new ServiceMetricEvent.Builder()
.setUser2(query.getDataSource())
.setUser4(String.format("topN/%s/%s", query.getThreshold(), query.getDimensionSpec().getDimension()))
.setUser5(COMMA_JOIN.join(query.getIntervals()))
.setUser6(String.valueOf(query.hasFilters()))
.setUser7(String.format("%,d aggs", query.getAggregatorSpecs().size()))
.setUser9(Minutes.minutes(numMinutes).toString());
}
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>> makeMetricManipulatorFn(
final TopNQuery query, final MetricManipulationFn fn
)
{
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
private String dimension = query.getDimensionSpec().getOutputName();
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
result.getValue(),
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(@Nullable DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMap();
for (AggregatorFactory agg : query.getAggregatorSpecs()) {
values.put(agg.getName(), fn.manipulate(agg, input.getMetric(agg.getName())));
}
for (PostAggregator postAgg : query.getPostAggregatorSpecs()) {
values.put(postAgg.getName(), input.getMetric(postAgg.getName()));
}
values.put(dimension, input.getDimensionValue(dimension));
return values;
}
}
)
);
return new Result<TopNResultValue>(
result.getTimestamp(),
new TopNResultValue(serializedValues)
);
}
};
}
@Override
public TypeReference<Result<TopNResultValue>> getResultTypeReference()
{
return TYPE_REFERENCE;
}
@Override
public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrategy(final TopNQuery query)
{
return new CacheStrategy<Result<TopNResultValue>, Object, TopNQuery>()
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
@Override
public byte[] computeCacheKey(TopNQuery query)
{
final byte[] dimensionSpecBytes = query.getDimensionSpec().getCacheKey();
final byte[] metricSpecBytes = query.getTopNMetricSpec().getCacheKey();
final DimFilter dimFilter = query.getDimensionsFilter();
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] aggregatorBytes = QueryCacheHelper.computeAggregatorBytes(query.getAggregatorSpecs());
final byte[] granularityBytes = query.getGranularity().cacheKey();
return ByteBuffer
.allocate(
1 + dimensionSpecBytes.length + metricSpecBytes.length + 4 +
granularityBytes.length + filterBytes.length + aggregatorBytes.length
)
.put(TOPN_QUERY)
.put(dimensionSpecBytes)
.put(metricSpecBytes)
.put(Ints.toByteArray(query.getThreshold()))
.put(granularityBytes)
.put(filterBytes)
.put(aggregatorBytes)
.array();
}
@Override
public TypeReference<Object> getCacheObjectClazz()
{
return OBJECT_TYPE_REFERENCE;
}
@Override
public Function<Result<TopNResultValue>, Object> prepareForCache()
{
return new Function<Result<TopNResultValue>, Object>()
{
@Override
public Object apply(@Nullable final Result<TopNResultValue> input)
{
List<DimensionAndMetricValueExtractor> results = Lists.newArrayList(input.getValue());
final List<Object> retVal = Lists.newArrayListWithCapacity(results.size() + 1);
// make sure to preserve timezone information when caching results
retVal.add(input.getTimestamp().getMillis());
for (DimensionAndMetricValueExtractor result : results) {
List<Object> vals = Lists.newArrayListWithCapacity(aggs.size() + 2);
vals.add(result.getStringDimensionValue(query.getDimensionSpec().getOutputName()));
for (AggregatorFactory agg : aggs) {
vals.add(result.getMetric(agg.getName()));
}
retVal.add(vals);
}
return retVal;
}
};
}
@Override
public Function<Object, Result<TopNResultValue>> pullFromCache()
{
return new Function<Object, Result<TopNResultValue>>()
{
private final QueryGranularity granularity = query.getGranularity();
@Override
public Result<TopNResultValue> apply(@Nullable Object input)
{
List<Object> results = (List<Object>) input;
List<Map<String, Object>> retVal = Lists.newArrayListWithCapacity(results.size());
Iterator<Object> inputIter = results.iterator();
DateTime timestamp = granularity.toDateTime(new DateTime(inputIter.next()).getMillis());
while (inputIter.hasNext()) {
List<Object> result = (List<Object>) inputIter.next();
Map<String, Object> vals = Maps.newLinkedHashMap();
Iterator<AggregatorFactory> aggIter = aggs.iterator();
Iterator<Object> resultIter = result.iterator();
vals.put(query.getDimensionSpec().getOutputName(), resultIter.next());
while (aggIter.hasNext() && resultIter.hasNext()) {
final AggregatorFactory factory = aggIter.next();
vals.put(factory.getName(), factory.deserialize(resultIter.next()));
}
for (PostAggregator postAgg : postAggs) {
vals.put(postAgg.getName(), postAgg.compute(vals));
}
retVal.add(vals);
}
return new Result<TopNResultValue>(timestamp, new TopNResultValue(retVal));
}
};
}
@Override
public Sequence<Result<TopNResultValue>> mergeSequences(Sequence<Sequence<Result<TopNResultValue>>> seqOfSequences)
{
return new MergeSequence<Result<TopNResultValue>>(getOrdering(), seqOfSequences);
}
};
}
@Override
public QueryRunner<Result<TopNResultValue>> preMergeQueryDecoration(QueryRunner<Result<TopNResultValue>> runner)
{
return new IntervalChunkingQueryRunner<Result<TopNResultValue>>(runner, config.getChunkPeriod());
}
@Override
public QueryRunner<Result<TopNResultValue>> postMergeQueryDecoration(final QueryRunner<Result<TopNResultValue>> runner)
{
return new ThresholdAdjustingQueryRunner(runner, config.getMinTopNThreshold());
}
private static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
private final int minTopNThreshold;
public ThresholdAdjustingQueryRunner(
QueryRunner<Result<TopNResultValue>> runner,
int minTopNThreshold
)
{
this.runner = runner;
this.minTopNThreshold = minTopNThreshold;
}
@Override
public Sequence<Result<TopNResultValue>> run(Query<Result<TopNResultValue>> input)
{
if (!(input instanceof TopNQuery)) {
throw new ISE("Can only handle [%s], got [%s]", TopNQuery.class, input.getClass());
}
final TopNQuery query = (TopNQuery) input;
if (query.getThreshold() > minTopNThreshold) {
return runner.run(query);
}
final boolean isBySegment = Boolean.parseBoolean(query.getContextValue("bySegment", "false"));
return Sequences.map(
runner.run(query.withThreshold(minTopNThreshold)),
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> input)
{
if (isBySegment) {
BySegmentTopNResultValue value = (BySegmentTopNResultValue) input.getValue();
return new Result<TopNResultValue>(
input.getTimestamp(),
new BySegmentTopNResultValue(
Lists.transform(
value.getResults(),
new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
@Override
public Result<TopNResultValue> apply(@Nullable Result<TopNResultValue> input)
{
return new Result<TopNResultValue>(
input.getTimestamp(),
new TopNResultValue(
Lists.<Object>newArrayList(
Iterables.limit(
input.getValue(),
query.getThreshold()
)
)
)
);
}
}
),
value.getSegmentId(),
value.getIntervalString()
)
);
}
return new Result<TopNResultValue>(
input.getTimestamp(),
new TopNResultValue(
Lists.<Object>newArrayList(
Iterables.limit(
input.getValue(),
query.getThreshold()
)
)
)
);
}
}
);
}
}
public Ordering<Result<TopNResultValue>> getOrdering()
{
return Ordering.natural();
}
}

View File

@ -0,0 +1,108 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import io.druid.collections.StupidPool;
import io.druid.guice.annotations.Global;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.Result;
import io.druid.segment.Segment;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
/**
*/
public class TopNQueryRunnerFactory implements QueryRunnerFactory<Result<TopNResultValue>, TopNQuery>
{
private final StupidPool<ByteBuffer> computationBufferPool;
private final TopNQueryQueryToolChest toolchest;
@Inject
public TopNQueryRunnerFactory(
@Global StupidPool<ByteBuffer> computationBufferPool,
TopNQueryQueryToolChest toolchest
)
{
this.computationBufferPool = computationBufferPool;
this.toolchest = toolchest;
}
@Override
public QueryRunner<Result<TopNResultValue>> createRunner(final Segment segment)
{
final TopNQueryEngine queryEngine = new TopNQueryEngine(computationBufferPool);
return new QueryRunner<Result<TopNResultValue>>()
{
@Override
public Sequence<Result<TopNResultValue>> run(Query<Result<TopNResultValue>> input)
{
if (!(input instanceof TopNQuery)) {
throw new ISE("Got a [%s] which isn't a %s", input.getClass(), TopNQuery.class);
}
final TopNQuery legacyQuery = (TopNQuery) input;
return new BaseSequence<Result<TopNResultValue>, Iterator<Result<TopNResultValue>>>(
new BaseSequence.IteratorMaker<Result<TopNResultValue>, Iterator<Result<TopNResultValue>>>()
{
@Override
public Iterator<Result<TopNResultValue>> make()
{
return queryEngine.query(legacyQuery, segment.asStorageAdapter()).iterator();
}
@Override
public void cleanup(Iterator<Result<TopNResultValue>> toClean)
{
}
}
);
}
};
}
@Override
public QueryRunner<Result<TopNResultValue>> mergeRunners(
ExecutorService queryExecutor, Iterable<QueryRunner<Result<TopNResultValue>>> queryRunners
)
{
return new ChainedExecutionQueryRunner<Result<TopNResultValue>>(
queryExecutor, toolchest.getOrdering(), queryRunners
);
}
@Override
public QueryToolChest<Result<TopNResultValue>, TopNQuery> getToolchest()
{
return toolchest;
}
}

View File

@ -0,0 +1,48 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import java.util.Iterator;
import java.util.List;
/**
*/
public interface TopNResultBuilder
{
public TopNResultBuilder addEntry(
String dimName,
Object dimValIndex,
Object[] metricVals,
List<AggregatorFactory> aggFactories,
List<PostAggregator> postAggs
);
public TopNResultBuilder addEntry(
DimensionAndMetricValueExtractor dimensionAndMetricValueExtractor
);
public Iterator<DimValHolder> getTopNIterator();
public Result<TopNResultValue> build();
}

View File

@ -0,0 +1,40 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import io.druid.query.Result;
import java.util.Comparator;
/**
*/
public interface TopNResultMerger
{
public static TopNResultMerger identity = new TopNResultMerger()
{
@Override
public Result<TopNResultValue> getResult(Result<TopNResultValue> result, Comparator comparator)
{
return result;
}
};
public Result<TopNResultValue> getResult(Result<TopNResultValue> result, Comparator comparator);
}

View File

@ -0,0 +1,107 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.IAE;
import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNResultValue implements Iterable<DimensionAndMetricValueExtractor>
{
private final List<DimensionAndMetricValueExtractor> value;
@JsonCreator
public TopNResultValue(
List<?> value
)
{
this.value = (value == null) ? Lists.<DimensionAndMetricValueExtractor>newArrayList() : Lists.transform(
value,
new Function<Object, DimensionAndMetricValueExtractor>()
{
@Override
public DimensionAndMetricValueExtractor apply(@Nullable Object input)
{
if (input instanceof Map) {
return new DimensionAndMetricValueExtractor((Map) input);
} else if (input instanceof DimensionAndMetricValueExtractor) {
return (DimensionAndMetricValueExtractor) input;
} else {
throw new IAE("Unknown type for input[%s]", input.getClass());
}
}
}
);
}
@JsonValue
public List<DimensionAndMetricValueExtractor> getValue()
{
return value;
}
@Override
public Iterator<DimensionAndMetricValueExtractor> iterator()
{
return value.iterator();
}
@Override
public String toString()
{
return "TopNResultValue{" +
"value=" + value +
'}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TopNResultValue that = (TopNResultValue) o;
if (value != null ? !value.equals(that.value) : that.value != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
return value != null ? value.hashCode() : 0;
}
}

View File

@ -0,0 +1,83 @@
package io.druid.query;
import com.google.common.base.Supplier;
import io.druid.collections.StupidPool;
import io.druid.query.search.SearchQueryQueryToolChest;
import io.druid.query.search.SearchQueryRunnerFactory;
import io.druid.query.search.search.SearchQueryConfig;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.segment.Segment;
import java.nio.ByteBuffer;
/**
*/
public class TestQueryRunners
{
public static final StupidPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 10);
}
}
);
public static final TopNQueryConfig topNConfig = new TopNQueryConfig();
public static StupidPool<ByteBuffer> getPool()
{
return pool;
}
public static <T> QueryRunner<T> makeTopNQueryRunner(
Segment adapter
)
{
QueryRunnerFactory factory = new TopNQueryRunnerFactory(pool, new TopNQueryQueryToolChest(topNConfig));
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
public static <T> QueryRunner<T> makeTimeSeriesQueryRunner(
Segment adapter
)
{
QueryRunnerFactory factory = TimeseriesQueryRunnerFactory.create();
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
public static <T> QueryRunner<T> makeSearchQueryRunner(
Segment adapter
)
{
QueryRunnerFactory factory = new SearchQueryRunnerFactory(new SearchQueryQueryToolChest(new SearchQueryConfig()));
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
public static <T> QueryRunner<T> makeTimeBoundaryQueryRunner(
Segment adapter
)
{
QueryRunnerFactory factory = new TimeBoundaryQueryRunnerFactory();
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
}

View File

@ -0,0 +1,458 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.dimension.DefaultDimensionSpec;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
*/
public class TopNBinaryFnTest
{
final CountAggregatorFactory rowsCount = new CountAggregatorFactory("rows");
final LongSumAggregatorFactory indexLongSum = new LongSumAggregatorFactory("index", "index");
final ConstantPostAggregator constant = new ConstantPostAggregator("const", 1L);
final FieldAccessPostAggregator rowsPostAgg = new FieldAccessPostAggregator("rows", "rows");
final FieldAccessPostAggregator indexPostAgg = new FieldAccessPostAggregator("index", "index");
final ArithmeticPostAggregator addrowsindexconstant = new ArithmeticPostAggregator(
"addrowsindexconstant",
"+",
Lists.newArrayList(constant, rowsPostAgg, indexPostAgg)
);
final List<AggregatorFactory> aggregatorFactories = Arrays.asList(
rowsCount,
indexLongSum
);
final List<PostAggregator> postAggregators = Arrays.<PostAggregator>asList(
addrowsindexconstant
);
private final DateTime currTime = new DateTime();
private void assertTopNMergeResult(Object o1, Object o2)
{
Iterator i1 = ((Iterable) o1).iterator();
Iterator i2 = ((Iterable) o2).iterator();
while (i1.hasNext() && i2.hasNext()) {
Assert.assertEquals(i1.next(), i2.next());
}
Assert.assertTrue(!i1.hasNext() && !i2.hasNext());
}
@Test
public void testMerge()
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", 2L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 4L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 2L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 3L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 0L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 1L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
)
)
)
);
Result<TopNResultValue> actual = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
2,
aggregatorFactories,
postAggregators
).apply(
result1,
result2
);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertTopNMergeResult(expected.getValue(), actual.getValue());
}
@Test
public void testMergeDay()
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", 2L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 4L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 2L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 3L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 0L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 1L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> expected = new Result<TopNResultValue>(
new DateTime(QueryGranularity.DAY.truncate(currTime.getMillis())),
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
)
)
)
);
Result<TopNResultValue> actual = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.DAY,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
2,
aggregatorFactories,
postAggregators
).apply(
result1,
result2
);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertTopNMergeResult(expected.getValue(), actual.getValue());
}
@Test
public void testMergeOneResultNull()
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", 2L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 4L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 2L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> result2 = null;
Result<TopNResultValue> expected = result1;
Result<TopNResultValue> actual = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
2,
aggregatorFactories,
postAggregators
).apply(
result1,
result2
);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertTopNMergeResult(expected.getValue(), actual.getValue());
}
@Test
public void testMergeByPostAgg()
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", 2L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 4L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 2L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 3L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 0L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 1L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
)
)
)
);
Result<TopNResultValue> actual = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("addrowsindexconstant"),
2,
aggregatorFactories,
postAggregators
).apply(
result1,
result2
);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertTopNMergeResult(expected.getValue(), actual.getValue());
}
@Test
public void testMergeShiftedTimestamp()
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 1L,
"index", 2L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 4L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 2L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime.plusHours(2),
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 3L,
"testdim", "1"
),
ImmutableMap.<String, Object>of(
"rows", 2L,
"index", 0L,
"testdim", "2"
),
ImmutableMap.<String, Object>of(
"rows", 0L,
"index", 1L,
"testdim", "3"
)
)
)
);
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.<String, Object>of(
"testdim", "1",
"rows", 3L,
"index", 5L,
"addrowsindexconstant", 9.0
),
ImmutableMap.<String, Object>of(
"testdim", "2",
"rows", 4L,
"index", 4L,
"addrowsindexconstant", 9.0
)
)
)
);
Result<TopNResultValue> actual = new TopNBinaryFn(
TopNResultMerger.identity,
QueryGranularity.ALL,
new DefaultDimensionSpec("testdim", null),
new NumericTopNMetricSpec("index"),
2,
aggregatorFactories,
postAggregators
).apply(
result1,
result2
);
Assert.assertEquals(expected.getTimestamp(), actual.getTimestamp());
assertTopNMergeResult(expected.getValue(), actual.getValue());
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,73 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.query.topn;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.QueryableIndex;
import io.druid.segment.QueryableIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestIndex;
import io.druid.segment.incremental.IncrementalIndex;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
public class TopNQueryRunnerTestHelper
{
@SuppressWarnings("unchecked")
public static Collection<?> makeQueryRunners(
QueryRunnerFactory factory
)
throws IOException
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
return Arrays.asList(
new Object[][]{
{
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mMappedTestIndex))
},
{
makeQueryRunner(factory, new QueryableIndexSegment(null, mergedRealtimeIndex))
}
}
);
}
public static <T> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
factory.createRunner(adapter),
factory.getToolchest()
);
}
}

View File

@ -35,6 +35,8 @@ import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryRunnerFactory;
import java.util.Map;
@ -49,6 +51,7 @@ public class QueryRunnerFactoryModule extends QueryToolChestModule
.put(TimeBoundaryQuery.class, TimeBoundaryQueryRunnerFactory.class)
.put(SegmentMetadataQuery.class, SegmentMetadataQueryRunnerFactory.class)
.put(GroupByQuery.class, GroupByQueryRunnerFactory.class)
.put(TopNQuery.class, TopNQueryRunnerFactory.class)
.build();
@Override

View File

@ -38,6 +38,9 @@ import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.query.timeboundary.TimeBoundaryQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import java.util.Map;
@ -52,6 +55,7 @@ public class QueryToolChestModule implements Module
.put(TimeBoundaryQuery.class, TimeBoundaryQueryQueryToolChest.class)
.put(SegmentMetadataQuery.class, SegmentMetadataQueryQueryToolChest.class)
.put(GroupByQuery.class, GroupByQueryQueryToolChest.class)
.put(TopNQuery.class, TopNQueryQueryToolChest.class)
.build();
@Override
@ -67,5 +71,6 @@ public class QueryToolChestModule implements Module
JsonConfigProvider.bind(binder, "druid.query", QueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.groupBy", GroupByQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.search", SearchQueryConfig.class);
JsonConfigProvider.bind(binder, "druid.query.topN", TopNQueryConfig.class);
}
}