Merge pull request #2602 from binlijin/distinctcount

implement special distinctcount
This commit is contained in:
Fangjin Yang 2016-03-28 17:20:17 -07:00
commit 62c1dc7a09
18 changed files with 1375 additions and 0 deletions

View File

@ -0,0 +1,76 @@
---
layout: doc_page
---
# DistinctCount aggregator
(1) First use single dimension hash-based partitioning to partition data by a dimension for example visitor_id, this to make sure all rows with a particular value for that dimension will go into the same segment or this might over count.
(2) Second use distinctCount to calculate exact distinct count, make sure queryGranularity is divide exactly by segmentGranularity or else the result will be wrong.
There is some limitations, when use with groupBy, the groupBy keys' numbers should not exceed maxIntermediateRows in every segment, if exceed the result will wrong. And when use with topN, numValuesPerPass should not too big, if too big the distinctCount will use many memory and cause the JVM out of service.
This has been used in production.
Example:
# Timeseries Query
```json
{
"queryType": "timeseries",
"dataSource": "sample_datasource",
"granularity": "day",
"aggregations": [
{
"type": "distinctCount",
"name": "uv",
"fieldName": "visitor_id"
}
],
"intervals": [
"2016-03-01T00:00:00.000/2013-03-20T00:00:00.000"
]
}
```
# TopN Query
```json
{
"queryType": "topN",
"dataSource": "sample_datasource",
"dimension": "sample_dim",
"threshold": 5,
"metric": "uv",
"granularity": "all",
"aggregations": [
{
"type": "distinctCount",
"name": "uv",
"fieldName": "visitor_id"
}
],
"intervals": [
"2016-03-06T00:00:00/2016-03-06T23:59:59"
]
}
```
# GroupBy Query
```json
{
"queryType": "groupBy",
"dataSource": "sample_datasource",
"dimensions": "[sample_dim]",
"granularity": "all",
"aggregations": [
{
"type": "distinctCount",
"name": "uv",
"fieldName": "visitor_id"
}
],
"intervals": [
"2016-03-06T00:00:00/2016-03-06T23:59:59"
]
}
```

View File

@ -47,6 +47,7 @@ If you'd like to take on maintenance for a community extension, please post on [
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)| |druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)| |druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)| |graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
## Promoting Community Extension to Core Extension ## Promoting Community Extension to Core Extension

View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions.contrib</groupId>
<artifactId>druid-distinctcount</artifactId>
<name>druid-distinctcount</name>
<description>druid-distinctcount</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,42 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.collections.bitmap.MutableBitmap;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RoaringBitMapFactory.class)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "java", value = JavaBitMapFactory.class),
@JsonSubTypes.Type(name = "concise", value = ConciseBitMapFactory.class),
@JsonSubTypes.Type(name = "roaring", value = RoaringBitMapFactory.class)
})
public interface BitMapFactory
{
/**
* Create a new empty bitmap
*
* @return the new bitmap
*/
public MutableBitmap makeEmptyMutableBitmap();
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.ConciseBitmapFactory;
import com.metamx.collections.bitmap.MutableBitmap;
public class ConciseBitMapFactory implements BitMapFactory
{
private static final BitmapFactory bitmapFactory = new ConciseBitmapFactory();
public ConciseBitMapFactory() {}
@Override
public MutableBitmap makeEmptyMutableBitmap()
{
return bitmapFactory.makeEmptyMutableBitmap();
}
@Override
public String toString()
{
return "ConciseBitMapFactory";
}
@Override
public boolean equals(Object o)
{
return this == o || o instanceof ConciseBitMapFactory;
}
@Override
public int hashCode()
{
return 0;
}
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.metamx.collections.bitmap.MutableBitmap;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.DimensionSelector;
public class DistinctCountAggregator implements Aggregator
{
private final String name;
private final DimensionSelector selector;
private final MutableBitmap mutableBitmap;
public DistinctCountAggregator(
String name,
DimensionSelector selector,
MutableBitmap mutableBitmap
)
{
this.name = name;
this.selector = selector;
this.mutableBitmap = mutableBitmap;
}
@Override
public void aggregate()
{
for (final Integer index : selector.getRow()) {
mutableBitmap.add(index);
}
}
@Override
public void reset()
{
mutableBitmap.clear();
}
@Override
public Object get()
{
return mutableBitmap.size();
}
@Override
public float getFloat()
{
return (float) mutableBitmap.size();
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
mutableBitmap.clear();
}
@Override
public long getLong()
{
return (long) mutableBitmap.size();
}
}

View File

@ -0,0 +1,241 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.StringUtils;
import com.metamx.common.logger.Logger;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
public class DistinctCountAggregatorFactory extends AggregatorFactory
{
private static final Logger log = new Logger(DistinctCountAggregatorFactory.class);
private static final byte CACHE_TYPE_ID = 20;
private static final BitMapFactory DEFAULT_BITMAP_FACTORY = new RoaringBitMapFactory();
private final String name;
private final String fieldName;
private final BitMapFactory bitMapFactory;
@JsonCreator
public DistinctCountAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("bitmapFactory") BitMapFactory bitMapFactory
)
{
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(fieldName);
this.name = name;
this.fieldName = fieldName;
this.bitMapFactory = bitMapFactory == null ? DEFAULT_BITMAP_FACTORY : bitMapFactory;
}
@Override
public Aggregator factorize(ColumnSelectorFactory columnFactory)
{
DimensionSelector selector = makeDimensionSelector(columnFactory);
if (selector == null) {
return new EmptyDistinctCountAggregator(name);
} else {
return new DistinctCountAggregator(
name,
selector,
bitMapFactory.makeEmptyMutableBitmap()
);
}
}
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
DimensionSelector selector = makeDimensionSelector(columnFactory);
if (selector == null) {
return new EmptyDistinctCountBufferAggregator();
} else {
return new DistinctCountBufferAggregator(makeDimensionSelector(columnFactory));
}
}
private DimensionSelector makeDimensionSelector(final ColumnSelectorFactory columnFactory)
{
return columnFactory.makeDimensionSelector(new DefaultDimensionSpec(fieldName, fieldName));
}
@Override
public Comparator getComparator()
{
return new Comparator()
{
@Override
public int compare(Object o, Object o1)
{
return Longs.compare(((Number) o).longValue(), ((Number) o1).longValue());
}
};
}
@Override
public Object combine(Object lhs, Object rhs)
{
if (lhs == null && rhs == null) {
return 0L;
}
if (rhs == null) {
return ((Number) lhs).longValue();
}
if (lhs == null) {
return ((Number) rhs).longValue();
}
return ((Number) lhs).longValue() + ((Number) rhs).longValue();
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new LongSumAggregatorFactory(name, name);
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new DistinctCountAggregatorFactory(fieldName, fieldName, bitMapFactory));
}
@Override
public Object deserialize(Object object)
{
return object;
}
@Override
public Object finalizeComputation(Object object)
{
return object;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty("bitmap")
public BitMapFactory getBitMapFactory()
{
return bitMapFactory;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@Override
public List<String> requiredFields()
{
return Arrays.asList(fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
byte[] bitMapFactoryCacheKey = StringUtils.toUtf8(bitMapFactory.toString());
return ByteBuffer.allocate(1 + fieldNameBytes.length + bitMapFactoryCacheKey.length)
.put(CACHE_TYPE_ID)
.put(fieldNameBytes)
.put(bitMapFactoryCacheKey)
.array();
}
@Override
public String getTypeName()
{
return "distinctCount";
}
@Override
public int getMaxIntermediateSize()
{
return Longs.BYTES;
}
@Override
public Object getAggregatorStartValue()
{
return 0;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DistinctCountAggregatorFactory that = (DistinctCountAggregatorFactory) o;
if (!fieldName.equals(that.fieldName)) {
return false;
}
if (!name.equals(that.name)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
return result;
}
@Override
public String toString()
{
return "DistinctCountAggregatorFactory{" +
"name='" + name + '\'' +
", fieldName='" + fieldName + '\'' +
'}';
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.bitmap.WrappedRoaringBitmap;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.DimensionSelector;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class DistinctCountBufferAggregator implements BufferAggregator
{
private final DimensionSelector selector;
private final Map<Integer, MutableBitmap> mutableBitmapCollection = new HashMap<>();
public DistinctCountBufferAggregator(
DimensionSelector selector
)
{
this.selector = selector;
}
@Override
public void init(ByteBuffer buf, int position)
{
buf.putLong(position, 0L);
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
MutableBitmap mutableBitmap = getMutableBitmap(buf, position);
for (final Integer index : selector.getRow()) {
mutableBitmap.add(index);
}
buf.putLong(position, mutableBitmap.size());
}
private MutableBitmap getMutableBitmap(ByteBuffer buf, int position)
{
MutableBitmap mutableBitmap = mutableBitmapCollection.get(position);
if (mutableBitmap == null) {
mutableBitmap = new WrappedRoaringBitmap();
mutableBitmapCollection.put(position, mutableBitmap);
}
return mutableBitmap;
}
@Override
public Object get(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) buf.getLong(position);
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return buf.getLong(position);
}
@Override
public void close()
{
mutableBitmapCollection.clear();
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import io.druid.initialization.DruidModule;
import java.util.List;
public class DistinctCountDruidModule implements DruidModule
{
public static final String DISTINCT_COUNT = "distinctCount";
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule("DistinctCountModule").registerSubtypes(
new NamedType(DistinctCountAggregatorFactory.class, DISTINCT_COUNT)
)
);
}
@Override
public void configure(Binder binder)
{
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import io.druid.query.aggregation.Aggregator;
public class EmptyDistinctCountAggregator implements Aggregator
{
private final String name;
public EmptyDistinctCountAggregator(String name)
{
this.name = name;
}
@Override
public void aggregate()
{
}
@Override
public void reset()
{
}
@Override
public Object get()
{
return 0L;
}
@Override
public float getFloat()
{
return (float) 0;
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
@Override
public long getLong()
{
return (long) 0;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import io.druid.query.aggregation.BufferAggregator;
import java.nio.ByteBuffer;
public class EmptyDistinctCountBufferAggregator implements BufferAggregator
{
public EmptyDistinctCountBufferAggregator()
{
}
@Override
public void init(ByteBuffer buf, int position)
{
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
}
@Override
public Object get(ByteBuffer buf, int position)
{
return 0L;
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
return (float) 0;
}
@Override
public long getLong(ByteBuffer buf, int position)
{
return (long) 0;
}
@Override
public void close()
{
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.metamx.collections.bitmap.BitSetBitmapFactory;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.MutableBitmap;
public class JavaBitMapFactory implements BitMapFactory
{
private static final BitmapFactory bitmapFactory = new BitSetBitmapFactory();
public JavaBitMapFactory() {}
@Override
public MutableBitmap makeEmptyMutableBitmap()
{
return bitmapFactory.makeEmptyMutableBitmap();
}
@Override
public String toString()
{
return "JavaBitMapFactory";
}
@Override
public boolean equals(Object o)
{
return this == o || o instanceof JavaBitMapFactory;
}
@Override
public int hashCode()
{
return 0;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.metamx.collections.bitmap.BitmapFactory;
import com.metamx.collections.bitmap.MutableBitmap;
import com.metamx.collections.bitmap.RoaringBitmapFactory;
public class RoaringBitMapFactory implements BitMapFactory
{
private static final BitmapFactory bitmapFactory = new RoaringBitmapFactory();
public RoaringBitMapFactory() {}
@Override
public MutableBitmap makeEmptyMutableBitmap()
{
return bitmapFactory.makeEmptyMutableBitmap();
}
@Override
public String toString()
{
return "RoaringBitMapFactory";
}
@Override
public boolean equals(Object o)
{
return this == o || o instanceof RoaringBitMapFactory;
}
@Override
public int hashCode()
{
return 0;
}
}

View File

@ -0,0 +1 @@
io.druid.query.aggregation.distinctcount.DistinctCountDruidModule

View File

@ -0,0 +1,172 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import io.druid.query.groupby.GroupByQueryQueryToolChest;
import io.druid.query.groupby.GroupByQueryRunnerFactory;
import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.query.groupby.orderby.OrderByColumnSpec;
import io.druid.segment.IncrementalIndexSegment;
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
public class DistinctCountGroupByQueryTest
{
@Test
public void testGroupByWithDistinctCountAgg() throws Exception
{
final ObjectMapper mapper = new DefaultObjectMapper();
final StupidPool<ByteBuffer> pool = new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
);
final GroupByQueryConfig config = new GroupByQueryConfig();
config.setMaxIntermediateRows(10000);
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
engine,
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
configSupplier,
new GroupByQueryQueryToolChest(
configSupplier, mapper, engine, pool,
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
),
pool
);
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
String visitor_id = "visitor_id";
String client_type = "client_type";
long timestamp = System.currentTimeMillis();
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp + 1,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp + 2,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "2", client_type, "android")
)
);
GroupByQuery query = new GroupByQuery.Builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setGranularity(QueryRunnerTestHelper.allGran)
.setDimensions(
Arrays.<DimensionSpec>asList(
new DefaultDimensionSpec(
client_type,
client_type
)
)
)
.setInterval(QueryRunnerTestHelper.fullOnInterval)
.setLimitSpec(
new DefaultLimitSpec(
Lists.newArrayList(
new OrderByColumnSpec(
client_type,
OrderByColumnSpec.Direction.DESCENDING
)
), 10
)
)
.setAggregatorSpecs(
Lists.newArrayList(
QueryRunnerTestHelper.rowsCount,
new DistinctCountAggregatorFactory("UV", visitor_id, null)
)
)
.build();
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(
factory,
factory.createRunner(incrementalIndexSegment),
query
);
List<Row> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
client_type, "iphone",
"UV", 2L,
"rows", 2L
),
GroupByQueryRunnerTestHelper.createExpectedRow(
"1970-01-01T00:00:00.000Z",
client_type, "android",
"UV", 1L,
"rows", 1L
)
);
TestHelper.assertExpectedObjects(expectedResults, results, "distinct-count");
}
}

View File

@ -0,0 +1,112 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Druids;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.timeseries.TimeseriesQuery;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.junit.Test;
import java.util.Arrays;
import java.util.List;
public class DistinctCountTimeseriesQueryTest
{
@Test
public void testTopNWithDistinctCountAgg() throws Exception
{
TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
String visitor_id = "visitor_id";
String client_type = "client_type";
DateTime time = new DateTime("2016-03-04T00:00:00.000Z");
long timestamp = time.getMillis();
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "2", client_type, "android")
)
);
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.aggregators(
Lists.newArrayList(
QueryRunnerTestHelper.rowsCount,
new DistinctCountAggregatorFactory("UV", visitor_id, null)
)
)
.build();
final Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
engine.process(
query,
new IncrementalIndexStorageAdapter(index)
),
Lists.<Result<TimeseriesResultValue>>newLinkedList()
);
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<>(
time,
new TimeseriesResultValue(
ImmutableMap.<String, Object>of("UV", 3, "rows", 3L)
)
)
);
TestHelper.assertExpectedResults(expectedResults, results);
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.distinctcount;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequences;
import io.druid.collections.StupidPool;
import io.druid.data.input.MapBasedInputRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.topn.TopNQuery;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryEngine;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.joda.time.DateTime;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class DistinctCountTopNQueryTest
{
@Test
public void testTopNWithDistinctCountAgg() throws Exception
{
TopNQueryEngine engine = new TopNQueryEngine(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
);
IncrementalIndex index = new OnheapIncrementalIndex(
0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
);
String visitor_id = "visitor_id";
String client_type = "client_type";
DateTime time = new DateTime("2016-03-04T00:00:00.000Z");
long timestamp = time.getMillis();
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "0", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "1", client_type, "iphone")
)
);
index.add(
new MapBasedInputRow(
timestamp,
Lists.newArrayList(visitor_id, client_type),
ImmutableMap.<String, Object>of(visitor_id, "2", client_type, "android")
)
);
TopNQuery query = new TopNQueryBuilder().dataSource(QueryRunnerTestHelper.dataSource)
.granularity(QueryRunnerTestHelper.allGran)
.intervals(QueryRunnerTestHelper.fullOnInterval)
.dimension(client_type)
.metric("UV")
.threshold(10)
.aggregators(
Lists.newArrayList(
QueryRunnerTestHelper.rowsCount,
new DistinctCountAggregatorFactory("UV", visitor_id, null)
)
)
.build();
final Iterable<Result<TopNResultValue>> results = Sequences.toList(
engine.query(
query,
new IncrementalIndexStorageAdapter(index)
),
Lists.<Result<TopNResultValue>>newLinkedList()
);
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
time,
new TopNResultValue(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>of(
client_type, "iphone",
"UV", 2L,
"rows", 2L
),
ImmutableMap.<String, Object>of(
client_type, "android",
"UV", 1L,
"rows", 1L
)
)
)
)
);
TestHelper.assertExpectedResults(expectedResults, results);
}
}

View File

@ -101,6 +101,7 @@
<module>extensions-contrib/graphite-emitter</module> <module>extensions-contrib/graphite-emitter</module>
<module>extensions-contrib/kafka-eight-simpleConsumer</module> <module>extensions-contrib/kafka-eight-simpleConsumer</module>
<module>extensions-contrib/rabbitmq</module> <module>extensions-contrib/rabbitmq</module>
<module>extensions-contrib/distinctcount</module>
<!-- distribution packaging --> <!-- distribution packaging -->
<module>distribution</module> <module>distribution</module>