mirror of https://github.com/apache/druid.git
implement special distinctcount
This commit is contained in:
parent
bfc0ae7d2a
commit
2729efca71
|
@ -0,0 +1,76 @@
|
|||
---
|
||||
layout: doc_page
|
||||
---
|
||||
|
||||
# DistinctCount aggregator
|
||||
|
||||
(1) First use single dimension hash-based partitioning to partition data by a dimension for example visitor_id, this to make sure all rows with a particular value for that dimension will go into the same segment or this might over count.
|
||||
(2) Second use distinctCount to calculate exact distinct count, make sure queryGranularity is divide exactly by segmentGranularity or else the result will be wrong.
|
||||
There is some limitations, when use with groupBy, the groupBy keys' numbers should not exceed maxIntermediateRows in every segment, if exceed the result will wrong. And when use with topN, numValuesPerPass should not too big, if too big the distinctCount will use many memory and cause the JVM out of service.
|
||||
|
||||
This has been used in production.
|
||||
|
||||
Example:
|
||||
# Timeseries Query
|
||||
|
||||
```json
|
||||
{
|
||||
"queryType": "timeseries",
|
||||
"dataSource": "sample_datasource",
|
||||
"granularity": "day",
|
||||
"aggregations": [
|
||||
{
|
||||
"type": "distinctCount",
|
||||
"name": "uv",
|
||||
"fieldName": "visitor_id"
|
||||
}
|
||||
],
|
||||
"intervals": [
|
||||
"2016-03-01T00:00:00.000/2013-03-20T00:00:00.000"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
# TopN Query
|
||||
|
||||
```json
|
||||
{
|
||||
"queryType": "topN",
|
||||
"dataSource": "sample_datasource",
|
||||
"dimension": "sample_dim",
|
||||
"threshold": 5,
|
||||
"metric": "uv",
|
||||
"granularity": "all",
|
||||
"aggregations": [
|
||||
{
|
||||
"type": "distinctCount",
|
||||
"name": "uv",
|
||||
"fieldName": "visitor_id"
|
||||
}
|
||||
],
|
||||
"intervals": [
|
||||
"2016-03-06T00:00:00/2016-03-06T23:59:59"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
# GroupBy Query
|
||||
|
||||
```json
|
||||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "sample_datasource",
|
||||
"dimensions": "[sample_dim]",
|
||||
"granularity": "all",
|
||||
"aggregations": [
|
||||
{
|
||||
"type": "distinctCount",
|
||||
"name": "uv",
|
||||
"fieldName": "visitor_id"
|
||||
}
|
||||
],
|
||||
"intervals": [
|
||||
"2016-03-06T00:00:00/2016-03-06T23:59:59"
|
||||
]
|
||||
}
|
||||
```
|
|
@ -47,6 +47,7 @@ If you'd like to take on maintenance for a community extension, please post on [
|
|||
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|
||||
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|
||||
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|
||||
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
|
||||
|
||||
## Promoting Community Extension to Core Extension
|
||||
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
~ or more contributor license agreements. See the NOTICE file
|
||||
~ distributed with this work for additional information
|
||||
~ regarding copyright ownership. Metamarkets licenses this file
|
||||
~ to you under the Apache License, Version 2.0 (the
|
||||
~ "License"); you may not use this file except in compliance
|
||||
~ with the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing,
|
||||
~ software distributed under the License is distributed on an
|
||||
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
~ KIND, either express or implied. See the License for the
|
||||
~ specific language governing permissions and limitations
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>io.druid.extensions.contrib</groupId>
|
||||
<artifactId>druid-distinctcount</artifactId>
|
||||
<name>druid-distinctcount</name>
|
||||
<description>druid-distinctcount</description>
|
||||
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.9.1-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>test</scope>
|
||||
<type>test-jar</type>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
|
||||
/**
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RoaringBitMapFactory.class)
|
||||
@JsonSubTypes(value = {
|
||||
@JsonSubTypes.Type(name = "java", value = JavaBitMapFactory.class),
|
||||
@JsonSubTypes.Type(name = "concise", value = ConciseBitMapFactory.class),
|
||||
@JsonSubTypes.Type(name = "roaring", value = RoaringBitMapFactory.class)
|
||||
})
|
||||
public interface BitMapFactory
|
||||
{
|
||||
/**
|
||||
* Create a new empty bitmap
|
||||
*
|
||||
* @return the new bitmap
|
||||
*/
|
||||
public MutableBitmap makeEmptyMutableBitmap();
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.metamx.collections.bitmap.BitmapFactory;
|
||||
import com.metamx.collections.bitmap.ConciseBitmapFactory;
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
|
||||
public class ConciseBitMapFactory implements BitMapFactory
|
||||
{
|
||||
private static final BitmapFactory bitmapFactory = new ConciseBitmapFactory();
|
||||
|
||||
public ConciseBitMapFactory() {}
|
||||
|
||||
@Override
|
||||
public MutableBitmap makeEmptyMutableBitmap()
|
||||
{
|
||||
return bitmapFactory.makeEmptyMutableBitmap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "ConciseBitMapFactory";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
return this == o || o instanceof ConciseBitMapFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,87 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
|
||||
public class DistinctCountAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final String name;
|
||||
private final DimensionSelector selector;
|
||||
private final MutableBitmap mutableBitmap;
|
||||
|
||||
public DistinctCountAggregator(
|
||||
String name,
|
||||
DimensionSelector selector,
|
||||
MutableBitmap mutableBitmap
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.selector = selector;
|
||||
this.mutableBitmap = mutableBitmap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
for (final Integer index : selector.getRow()) {
|
||||
mutableBitmap.add(index);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
mutableBitmap.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return mutableBitmap.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat()
|
||||
{
|
||||
return (float) mutableBitmap.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
mutableBitmap.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong()
|
||||
{
|
||||
return (long) mutableBitmap.size();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,241 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.StringUtils;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
|
||||
public class DistinctCountAggregatorFactory extends AggregatorFactory
|
||||
{
|
||||
private static final Logger log = new Logger(DistinctCountAggregatorFactory.class);
|
||||
private static final byte CACHE_TYPE_ID = 20;
|
||||
private static final BitMapFactory DEFAULT_BITMAP_FACTORY = new RoaringBitMapFactory();
|
||||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
private final BitMapFactory bitMapFactory;
|
||||
|
||||
@JsonCreator
|
||||
public DistinctCountAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") String fieldName,
|
||||
@JsonProperty("bitmapFactory") BitMapFactory bitMapFactory
|
||||
)
|
||||
{
|
||||
Preconditions.checkNotNull(name);
|
||||
Preconditions.checkNotNull(fieldName);
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.bitMapFactory = bitMapFactory == null ? DEFAULT_BITMAP_FACTORY : bitMapFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator factorize(ColumnSelectorFactory columnFactory)
|
||||
{
|
||||
DimensionSelector selector = makeDimensionSelector(columnFactory);
|
||||
if (selector == null) {
|
||||
return new EmptyDistinctCountAggregator(name);
|
||||
} else {
|
||||
return new DistinctCountAggregator(
|
||||
name,
|
||||
selector,
|
||||
bitMapFactory.makeEmptyMutableBitmap()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
|
||||
{
|
||||
DimensionSelector selector = makeDimensionSelector(columnFactory);
|
||||
if (selector == null) {
|
||||
return new EmptyDistinctCountBufferAggregator();
|
||||
} else {
|
||||
return new DistinctCountBufferAggregator(makeDimensionSelector(columnFactory));
|
||||
}
|
||||
}
|
||||
|
||||
private DimensionSelector makeDimensionSelector(final ColumnSelectorFactory columnFactory)
|
||||
{
|
||||
return columnFactory.makeDimensionSelector(new DefaultDimensionSpec(fieldName, fieldName));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator getComparator()
|
||||
{
|
||||
return new Comparator()
|
||||
{
|
||||
@Override
|
||||
public int compare(Object o, Object o1)
|
||||
{
|
||||
return Longs.compare(((Number) o).longValue(), ((Number) o1).longValue());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object combine(Object lhs, Object rhs)
|
||||
{
|
||||
if (lhs == null && rhs == null) {
|
||||
return 0L;
|
||||
}
|
||||
if (rhs == null) {
|
||||
return ((Number) lhs).longValue();
|
||||
}
|
||||
if (lhs == null) {
|
||||
return ((Number) rhs).longValue();
|
||||
}
|
||||
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new LongSumAggregatorFactory(name, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new DistinctCountAggregatorFactory(fieldName, fieldName, bitMapFactory));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object deserialize(Object object)
|
||||
{
|
||||
return object;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return object;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public String getFieldName()
|
||||
{
|
||||
return fieldName;
|
||||
}
|
||||
|
||||
@JsonProperty("bitmap")
|
||||
public BitMapFactory getBitMapFactory()
|
||||
{
|
||||
return bitMapFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> requiredFields()
|
||||
{
|
||||
return Arrays.asList(fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
|
||||
byte[] bitMapFactoryCacheKey = StringUtils.toUtf8(bitMapFactory.toString());
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length + bitMapFactoryCacheKey.length)
|
||||
.put(CACHE_TYPE_ID)
|
||||
.put(fieldNameBytes)
|
||||
.put(bitMapFactoryCacheKey)
|
||||
.array();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "distinctCount";
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIntermediateSize()
|
||||
{
|
||||
return Longs.BYTES;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object getAggregatorStartValue()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
DistinctCountAggregatorFactory that = (DistinctCountAggregatorFactory) o;
|
||||
|
||||
if (!fieldName.equals(that.fieldName)) {
|
||||
return false;
|
||||
}
|
||||
if (!name.equals(that.name)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = name.hashCode();
|
||||
result = 31 * result + fieldName.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "DistinctCountAggregatorFactory{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
import com.metamx.collections.bitmap.WrappedRoaringBitmap;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.segment.DimensionSelector;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DistinctCountBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private final DimensionSelector selector;
|
||||
private final Map<Integer, MutableBitmap> mutableBitmapCollection = new HashMap<>();
|
||||
|
||||
public DistinctCountBufferAggregator(
|
||||
DimensionSelector selector
|
||||
)
|
||||
{
|
||||
this.selector = selector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
buf.putLong(position, 0L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
MutableBitmap mutableBitmap = getMutableBitmap(buf, position);
|
||||
for (final Integer index : selector.getRow()) {
|
||||
mutableBitmap.add(index);
|
||||
}
|
||||
buf.putLong(position, mutableBitmap.size());
|
||||
}
|
||||
|
||||
private MutableBitmap getMutableBitmap(ByteBuffer buf, int position)
|
||||
{
|
||||
MutableBitmap mutableBitmap = mutableBitmapCollection.get(position);
|
||||
if (mutableBitmap == null) {
|
||||
mutableBitmap = new WrappedRoaringBitmap();
|
||||
mutableBitmapCollection.put(position, mutableBitmap);
|
||||
}
|
||||
return mutableBitmap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getLong(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
return (float) buf.getLong(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return buf.getLong(position);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
mutableBitmapCollection.clear();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import io.druid.initialization.DruidModule;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DistinctCountDruidModule implements DruidModule
|
||||
{
|
||||
public static final String DISTINCT_COUNT = "distinctCount";
|
||||
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return ImmutableList.of(
|
||||
new SimpleModule("DistinctCountModule").registerSubtypes(
|
||||
new NamedType(DistinctCountAggregatorFactory.class, DISTINCT_COUNT)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
|
||||
public class EmptyDistinctCountAggregator implements Aggregator
|
||||
{
|
||||
|
||||
private final String name;
|
||||
|
||||
public EmptyDistinctCountAggregator(String name)
|
||||
{
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get()
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat()
|
||||
{
|
||||
return (float) 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong()
|
||||
{
|
||||
return (long) 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class EmptyDistinctCountBufferAggregator implements BufferAggregator
|
||||
{
|
||||
|
||||
public EmptyDistinctCountBufferAggregator()
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(ByteBuffer buf, int position)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object get(ByteBuffer buf, int position)
|
||||
{
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getFloat(ByteBuffer buf, int position)
|
||||
{
|
||||
return (float) 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLong(ByteBuffer buf, int position)
|
||||
{
|
||||
return (long) 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.metamx.collections.bitmap.BitSetBitmapFactory;
|
||||
import com.metamx.collections.bitmap.BitmapFactory;
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
|
||||
public class JavaBitMapFactory implements BitMapFactory
|
||||
{
|
||||
private static final BitmapFactory bitmapFactory = new BitSetBitmapFactory();
|
||||
|
||||
public JavaBitMapFactory() {}
|
||||
|
||||
@Override
|
||||
public MutableBitmap makeEmptyMutableBitmap()
|
||||
{
|
||||
return bitmapFactory.makeEmptyMutableBitmap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "JavaBitMapFactory";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
return this == o || o instanceof JavaBitMapFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.metamx.collections.bitmap.BitmapFactory;
|
||||
import com.metamx.collections.bitmap.MutableBitmap;
|
||||
import com.metamx.collections.bitmap.RoaringBitmapFactory;
|
||||
|
||||
public class RoaringBitMapFactory implements BitMapFactory
|
||||
{
|
||||
private static final BitmapFactory bitmapFactory = new RoaringBitmapFactory();
|
||||
|
||||
public RoaringBitMapFactory() {}
|
||||
|
||||
@Override
|
||||
public MutableBitmap makeEmptyMutableBitmap()
|
||||
{
|
||||
return bitmapFactory.makeEmptyMutableBitmap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "RoaringBitMapFactory";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
return this == o || o instanceof RoaringBitMapFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
io.druid.query.aggregation.distinctcount.DistinctCountDruidModule
|
|
@ -0,0 +1,172 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryEngine;
|
||||
import io.druid.query.groupby.GroupByQueryQueryToolChest;
|
||||
import io.druid.query.groupby.GroupByQueryRunnerFactory;
|
||||
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
|
||||
import io.druid.query.groupby.orderby.DefaultLimitSpec;
|
||||
import io.druid.query.groupby.orderby.OrderByColumnSpec;
|
||||
import io.druid.segment.IncrementalIndexSegment;
|
||||
import io.druid.segment.Segment;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class DistinctCountGroupByQueryTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testGroupByWithDistinctCountAgg() throws Exception
|
||||
{
|
||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
final StupidPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(1024 * 1024);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
final GroupByQueryConfig config = new GroupByQueryConfig();
|
||||
config.setMaxIntermediateRows(10000);
|
||||
|
||||
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
|
||||
final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
|
||||
|
||||
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
|
||||
engine,
|
||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
|
||||
configSupplier,
|
||||
new GroupByQueryQueryToolChest(
|
||||
configSupplier, mapper, engine, pool,
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
),
|
||||
pool
|
||||
);
|
||||
|
||||
IncrementalIndex index = new OnheapIncrementalIndex(
|
||||
0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
|
||||
);
|
||||
String visitor_id = "visitor_id";
|
||||
String client_type = "client_type";
|
||||
long timestamp = System.currentTimeMillis();
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp + 1,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp + 2,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "2", client_type, "android")
|
||||
)
|
||||
);
|
||||
|
||||
GroupByQuery query = new GroupByQuery.Builder()
|
||||
.setDataSource(QueryRunnerTestHelper.dataSource)
|
||||
.setGranularity(QueryRunnerTestHelper.allGran)
|
||||
.setDimensions(
|
||||
Arrays.<DimensionSpec>asList(
|
||||
new DefaultDimensionSpec(
|
||||
client_type,
|
||||
client_type
|
||||
)
|
||||
)
|
||||
)
|
||||
.setInterval(QueryRunnerTestHelper.fullOnInterval)
|
||||
.setLimitSpec(
|
||||
new DefaultLimitSpec(
|
||||
Lists.newArrayList(
|
||||
new OrderByColumnSpec(
|
||||
client_type,
|
||||
OrderByColumnSpec.Direction.DESCENDING
|
||||
)
|
||||
), 10
|
||||
)
|
||||
)
|
||||
.setAggregatorSpecs(
|
||||
Lists.newArrayList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new DistinctCountAggregatorFactory("UV", visitor_id, null)
|
||||
)
|
||||
)
|
||||
.build();
|
||||
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
|
||||
|
||||
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
|
||||
factory,
|
||||
factory.createRunner(incrementalIndexSegment),
|
||||
query
|
||||
);
|
||||
|
||||
List<Row> expectedResults = Arrays.asList(
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow(
|
||||
"1970-01-01T00:00:00.000Z",
|
||||
client_type, "iphone",
|
||||
"UV", 2L,
|
||||
"rows", 2L
|
||||
),
|
||||
GroupByQueryRunnerTestHelper.createExpectedRow(
|
||||
"1970-01-01T00:00:00.000Z",
|
||||
client_type, "android",
|
||||
"UV", 1L,
|
||||
"rows", 1L
|
||||
)
|
||||
);
|
||||
TestHelper.assertExpectedObjects(expectedResults, results, "distinct-count");
|
||||
}
|
||||
}
|
|
@ -0,0 +1,112 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.timeseries.TimeseriesQuery;
|
||||
import io.druid.query.timeseries.TimeseriesQueryEngine;
|
||||
import io.druid.query.timeseries.TimeseriesResultValue;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class DistinctCountTimeseriesQueryTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testTopNWithDistinctCountAgg() throws Exception
|
||||
{
|
||||
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
|
||||
|
||||
IncrementalIndex index = new OnheapIncrementalIndex(
|
||||
0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
|
||||
);
|
||||
String visitor_id = "visitor_id";
|
||||
String client_type = "client_type";
|
||||
DateTime time = new DateTime("2016-03-04T00:00:00.000Z");
|
||||
long timestamp = time.getMillis();
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "2", client_type, "android")
|
||||
)
|
||||
);
|
||||
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(
|
||||
Lists.newArrayList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new DistinctCountAggregatorFactory("UV", visitor_id, null)
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
final Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
engine.process(
|
||||
query,
|
||||
new IncrementalIndexStorageAdapter(index)
|
||||
),
|
||||
Lists.<Result<TimeseriesResultValue>>newLinkedList()
|
||||
);
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<>(
|
||||
time,
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("UV", 3, "rows", 3L)
|
||||
)
|
||||
)
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.distinctcount;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.topn.TopNQuery;
|
||||
import io.druid.query.topn.TopNQueryBuilder;
|
||||
import io.druid.query.topn.TopNQueryEngine;
|
||||
import io.druid.query.topn.TopNResultValue;
|
||||
import io.druid.segment.TestHelper;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
|
||||
import io.druid.segment.incremental.OnheapIncrementalIndex;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DistinctCountTopNQueryTest
|
||||
{
|
||||
|
||||
@Test
|
||||
public void testTopNWithDistinctCountAgg() throws Exception
|
||||
{
|
||||
TopNQueryEngine engine = new TopNQueryEngine(
|
||||
new StupidPool<ByteBuffer>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(1024 * 1024);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
IncrementalIndex index = new OnheapIncrementalIndex(
|
||||
0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
|
||||
);
|
||||
String visitor_id = "visitor_id";
|
||||
String client_type = "client_type";
|
||||
DateTime time = new DateTime("2016-03-04T00:00:00.000Z");
|
||||
long timestamp = time.getMillis();
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
|
||||
)
|
||||
);
|
||||
index.add(
|
||||
new MapBasedInputRow(
|
||||
timestamp,
|
||||
Lists.newArrayList(visitor_id, client_type),
|
||||
ImmutableMap.<String, Object>of(visitor_id, "2", client_type, "android")
|
||||
)
|
||||
);
|
||||
|
||||
TopNQuery query = new TopNQueryBuilder().dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.dimension(client_type)
|
||||
.metric("UV")
|
||||
.threshold(10)
|
||||
.aggregators(
|
||||
Lists.newArrayList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new DistinctCountAggregatorFactory("UV", visitor_id, null)
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
final Iterable<Result<TopNResultValue>> results = Sequences.toList(
|
||||
engine.query(
|
||||
query,
|
||||
new IncrementalIndexStorageAdapter(index)
|
||||
),
|
||||
Lists.<Result<TopNResultValue>>newLinkedList()
|
||||
);
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<>(
|
||||
time,
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>of(
|
||||
client_type, "iphone",
|
||||
"UV", 2L,
|
||||
"rows", 2L
|
||||
),
|
||||
ImmutableMap.<String, Object>of(
|
||||
client_type, "android",
|
||||
"UV", 1L,
|
||||
"rows", 1L
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
}
|
1
pom.xml
1
pom.xml
|
@ -101,6 +101,7 @@
|
|||
<module>extensions-contrib/graphite-emitter</module>
|
||||
<module>extensions-contrib/kafka-eight-simpleConsumer</module>
|
||||
<module>extensions-contrib/rabbitmq</module>
|
||||
<module>extensions-contrib/distinctcount</module>
|
||||
|
||||
<!-- distribution packaging -->
|
||||
<module>distribution</module>
|
||||
|
|
Loading…
Reference in New Issue