diff --git a/docs/content/development/extensions-contrib/distinctcount.md b/docs/content/development/extensions-contrib/distinctcount.md
new file mode 100644
index 00000000000..9d05874030f
--- /dev/null
+++ b/docs/content/development/extensions-contrib/distinctcount.md
@@ -0,0 +1,76 @@
+---
+layout: doc_page
+---
+
+# DistinctCount aggregator
+
+(1) First use single dimension hash-based partitioning to partition data by a dimension for example visitor_id, this to make sure all rows with a particular value for that dimension will go into the same segment or this might over count.
+(2) Second use distinctCount to calculate exact distinct count, make sure queryGranularity is divide exactly by segmentGranularity or else the result will be wrong.
+There is some limitations, when use with groupBy, the groupBy keys' numbers should not exceed maxIntermediateRows in every segment, if exceed the result will wrong. And when use with topN, numValuesPerPass should not too big, if too big the distinctCount will use many memory and cause the JVM out of service.
+
+This has been used in production.
+
+Example:
+# Timeseries Query
+
+```json
+{
+ "queryType": "timeseries",
+ "dataSource": "sample_datasource",
+ "granularity": "day",
+ "aggregations": [
+ {
+ "type": "distinctCount",
+ "name": "uv",
+ "fieldName": "visitor_id"
+ }
+ ],
+ "intervals": [
+ "2016-03-01T00:00:00.000/2013-03-20T00:00:00.000"
+ ]
+}
+```
+
+# TopN Query
+
+```json
+{
+ "queryType": "topN",
+ "dataSource": "sample_datasource",
+ "dimension": "sample_dim",
+ "threshold": 5,
+ "metric": "uv",
+ "granularity": "all",
+ "aggregations": [
+ {
+ "type": "distinctCount",
+ "name": "uv",
+ "fieldName": "visitor_id"
+ }
+ ],
+ "intervals": [
+ "2016-03-06T00:00:00/2016-03-06T23:59:59"
+ ]
+}
+```
+
+# GroupBy Query
+
+```json
+{
+ "queryType": "groupBy",
+ "dataSource": "sample_datasource",
+ "dimensions": "[sample_dim]",
+ "granularity": "all",
+ "aggregations": [
+ {
+ "type": "distinctCount",
+ "name": "uv",
+ "fieldName": "visitor_id"
+ }
+ ],
+ "intervals": [
+ "2016-03-06T00:00:00/2016-03-06T23:59:59"
+ ]
+}
+```
diff --git a/docs/content/development/extensions.md b/docs/content/development/extensions.md
index 0159575c922..64fac80964e 100644
--- a/docs/content/development/extensions.md
+++ b/docs/content/development/extensions.md
@@ -47,6 +47,7 @@ If you'd like to take on maintenance for a community extension, please post on [
|druid-rabbitmq|RabbitMQ firehose.|[link](../development/extensions-contrib/rabbitmq.html)|
|druid-rocketmq|RocketMQ firehose.|[link](../development/extensions-contrib/rocketmq.html)|
|graphite-emitter|Graphite metrics emitter|[link](../development/extensions-contrib/graphite.html)|
+|druid-distinctcount|DistinctCount aggregator|[link](../development/extensions-contrib/distinctcount.html)|
## Promoting Community Extension to Core Extension
diff --git a/extensions-contrib/distinctcount/pom.xml b/extensions-contrib/distinctcount/pom.xml
new file mode 100644
index 00000000000..c54c2983af4
--- /dev/null
+++ b/extensions-contrib/distinctcount/pom.xml
@@ -0,0 +1,59 @@
+
+
+
+
+ 4.0.0
+
+ io.druid.extensions.contrib
+ druid-distinctcount
+ druid-distinctcount
+ druid-distinctcount
+
+
+ io.druid
+ druid
+ 0.9.1-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+
+
+
+
+ io.druid
+ druid-processing
+ ${project.parent.version}
+ test
+ test-jar
+
+
+ junit
+ junit
+ test
+
+
+
+
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/BitMapFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/BitMapFactory.java
new file mode 100644
index 00000000000..3c517cd4ae0
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/BitMapFactory.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.metamx.collections.bitmap.MutableBitmap;
+
+/**
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = RoaringBitMapFactory.class)
+@JsonSubTypes(value = {
+ @JsonSubTypes.Type(name = "java", value = JavaBitMapFactory.class),
+ @JsonSubTypes.Type(name = "concise", value = ConciseBitMapFactory.class),
+ @JsonSubTypes.Type(name = "roaring", value = RoaringBitMapFactory.class)
+})
+public interface BitMapFactory
+{
+ /**
+ * Create a new empty bitmap
+ *
+ * @return the new bitmap
+ */
+ public MutableBitmap makeEmptyMutableBitmap();
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/ConciseBitMapFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/ConciseBitMapFactory.java
new file mode 100644
index 00000000000..e9b2d49b7b9
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/ConciseBitMapFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.metamx.collections.bitmap.BitmapFactory;
+import com.metamx.collections.bitmap.ConciseBitmapFactory;
+import com.metamx.collections.bitmap.MutableBitmap;
+
+public class ConciseBitMapFactory implements BitMapFactory
+{
+ private static final BitmapFactory bitmapFactory = new ConciseBitmapFactory();
+
+ public ConciseBitMapFactory() {}
+
+ @Override
+ public MutableBitmap makeEmptyMutableBitmap()
+ {
+ return bitmapFactory.makeEmptyMutableBitmap();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConciseBitMapFactory";
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || o instanceof ConciseBitMapFactory;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return 0;
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregator.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregator.java
new file mode 100644
index 00000000000..f48102ca1f7
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.metamx.collections.bitmap.MutableBitmap;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.segment.DimensionSelector;
+
+public class DistinctCountAggregator implements Aggregator
+{
+
+ private final String name;
+ private final DimensionSelector selector;
+ private final MutableBitmap mutableBitmap;
+
+ public DistinctCountAggregator(
+ String name,
+ DimensionSelector selector,
+ MutableBitmap mutableBitmap
+ )
+ {
+ this.name = name;
+ this.selector = selector;
+ this.mutableBitmap = mutableBitmap;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ for (final Integer index : selector.getRow()) {
+ mutableBitmap.add(index);
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ mutableBitmap.clear();
+ }
+
+ @Override
+ public Object get()
+ {
+ return mutableBitmap.size();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return (float) mutableBitmap.size();
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public void close()
+ {
+ mutableBitmap.clear();
+ }
+
+ @Override
+ public long getLong()
+ {
+ return (long) mutableBitmap.size();
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java
new file mode 100644
index 00000000000..08b69aae4b3
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountAggregatorFactory.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import com.metamx.common.StringUtils;
+import com.metamx.common.logger.Logger;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.dimension.DefaultDimensionSpec;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.DimensionSelector;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+public class DistinctCountAggregatorFactory extends AggregatorFactory
+{
+ private static final Logger log = new Logger(DistinctCountAggregatorFactory.class);
+ private static final byte CACHE_TYPE_ID = 20;
+ private static final BitMapFactory DEFAULT_BITMAP_FACTORY = new RoaringBitMapFactory();
+
+ private final String name;
+ private final String fieldName;
+ private final BitMapFactory bitMapFactory;
+
+ @JsonCreator
+ public DistinctCountAggregatorFactory(
+ @JsonProperty("name") String name,
+ @JsonProperty("fieldName") String fieldName,
+ @JsonProperty("bitmapFactory") BitMapFactory bitMapFactory
+ )
+ {
+ Preconditions.checkNotNull(name);
+ Preconditions.checkNotNull(fieldName);
+ this.name = name;
+ this.fieldName = fieldName;
+ this.bitMapFactory = bitMapFactory == null ? DEFAULT_BITMAP_FACTORY : bitMapFactory;
+ }
+
+ @Override
+ public Aggregator factorize(ColumnSelectorFactory columnFactory)
+ {
+ DimensionSelector selector = makeDimensionSelector(columnFactory);
+ if (selector == null) {
+ return new EmptyDistinctCountAggregator(name);
+ } else {
+ return new DistinctCountAggregator(
+ name,
+ selector,
+ bitMapFactory.makeEmptyMutableBitmap()
+ );
+ }
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
+ {
+ DimensionSelector selector = makeDimensionSelector(columnFactory);
+ if (selector == null) {
+ return new EmptyDistinctCountBufferAggregator();
+ } else {
+ return new DistinctCountBufferAggregator(makeDimensionSelector(columnFactory));
+ }
+ }
+
+ private DimensionSelector makeDimensionSelector(final ColumnSelectorFactory columnFactory)
+ {
+ return columnFactory.makeDimensionSelector(new DefaultDimensionSpec(fieldName, fieldName));
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return new Comparator()
+ {
+ @Override
+ public int compare(Object o, Object o1)
+ {
+ return Longs.compare(((Number) o).longValue(), ((Number) o1).longValue());
+ }
+ };
+ }
+
+ @Override
+ public Object combine(Object lhs, Object rhs)
+ {
+ if (lhs == null && rhs == null) {
+ return 0L;
+ }
+ if (rhs == null) {
+ return ((Number) lhs).longValue();
+ }
+ if (lhs == null) {
+ return ((Number) rhs).longValue();
+ }
+ return ((Number) lhs).longValue() + ((Number) rhs).longValue();
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ return new LongSumAggregatorFactory(name, name);
+ }
+
+ @Override
+ public List getRequiredColumns()
+ {
+ return Arrays.asList(new DistinctCountAggregatorFactory(fieldName, fieldName, bitMapFactory));
+ }
+
+ @Override
+ public Object deserialize(Object object)
+ {
+ return object;
+ }
+
+ @Override
+ public Object finalizeComputation(Object object)
+ {
+ return object;
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @JsonProperty("bitmap")
+ public BitMapFactory getBitMapFactory()
+ {
+ return bitMapFactory;
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public List requiredFields()
+ {
+ return Arrays.asList(fieldName);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
+ byte[] bitMapFactoryCacheKey = StringUtils.toUtf8(bitMapFactory.toString());
+ return ByteBuffer.allocate(1 + fieldNameBytes.length + bitMapFactoryCacheKey.length)
+ .put(CACHE_TYPE_ID)
+ .put(fieldNameBytes)
+ .put(bitMapFactoryCacheKey)
+ .array();
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ return "distinctCount";
+ }
+
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ return Longs.BYTES;
+ }
+
+ @Override
+ public Object getAggregatorStartValue()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ DistinctCountAggregatorFactory that = (DistinctCountAggregatorFactory) o;
+
+ if (!fieldName.equals(that.fieldName)) {
+ return false;
+ }
+ if (!name.equals(that.name)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = name.hashCode();
+ result = 31 * result + fieldName.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DistinctCountAggregatorFactory{" +
+ "name='" + name + '\'' +
+ ", fieldName='" + fieldName + '\'' +
+ '}';
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountBufferAggregator.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountBufferAggregator.java
new file mode 100644
index 00000000000..a90dd6f2dcc
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountBufferAggregator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.metamx.collections.bitmap.MutableBitmap;
+import com.metamx.collections.bitmap.WrappedRoaringBitmap;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.segment.DimensionSelector;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class DistinctCountBufferAggregator implements BufferAggregator
+{
+ private final DimensionSelector selector;
+ private final Map mutableBitmapCollection = new HashMap<>();
+
+ public DistinctCountBufferAggregator(
+ DimensionSelector selector
+ )
+ {
+ this.selector = selector;
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ buf.putLong(position, 0L);
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ MutableBitmap mutableBitmap = getMutableBitmap(buf, position);
+ for (final Integer index : selector.getRow()) {
+ mutableBitmap.add(index);
+ }
+ buf.putLong(position, mutableBitmap.size());
+ }
+
+ private MutableBitmap getMutableBitmap(ByteBuffer buf, int position)
+ {
+ MutableBitmap mutableBitmap = mutableBitmapCollection.get(position);
+ if (mutableBitmap == null) {
+ mutableBitmap = new WrappedRoaringBitmap();
+ mutableBitmapCollection.put(position, mutableBitmap);
+ }
+ return mutableBitmap;
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return buf.getLong(position);
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ return (float) buf.getLong(position);
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ return buf.getLong(position);
+ }
+
+ @Override
+ public void close()
+ {
+ mutableBitmapCollection.clear();
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountDruidModule.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountDruidModule.java
new file mode 100644
index 00000000000..faf318a6491
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/DistinctCountDruidModule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class DistinctCountDruidModule implements DruidModule
+{
+ public static final String DISTINCT_COUNT = "distinctCount";
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule("DistinctCountModule").registerSubtypes(
+ new NamedType(DistinctCountAggregatorFactory.class, DISTINCT_COUNT)
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountAggregator.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountAggregator.java
new file mode 100644
index 00000000000..4828446427a
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountAggregator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import io.druid.query.aggregation.Aggregator;
+
+public class EmptyDistinctCountAggregator implements Aggregator
+{
+
+ private final String name;
+
+ public EmptyDistinctCountAggregator(String name)
+ {
+ this.name = name;
+ }
+
+ @Override
+ public void aggregate()
+ {
+ }
+
+ @Override
+ public void reset()
+ {
+ }
+
+ @Override
+ public Object get()
+ {
+ return 0L;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ return (float) 0;
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
+ public long getLong()
+ {
+ return (long) 0;
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java
new file mode 100644
index 00000000000..0238a8e3906
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/EmptyDistinctCountBufferAggregator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import io.druid.query.aggregation.BufferAggregator;
+
+import java.nio.ByteBuffer;
+
+public class EmptyDistinctCountBufferAggregator implements BufferAggregator
+{
+
+ public EmptyDistinctCountBufferAggregator()
+ {
+ }
+
+ @Override
+ public void init(ByteBuffer buf, int position)
+ {
+ }
+
+ @Override
+ public void aggregate(ByteBuffer buf, int position)
+ {
+ }
+
+ @Override
+ public Object get(ByteBuffer buf, int position)
+ {
+ return 0L;
+ }
+
+ @Override
+ public float getFloat(ByteBuffer buf, int position)
+ {
+ return (float) 0;
+ }
+
+ @Override
+ public long getLong(ByteBuffer buf, int position)
+ {
+ return (long) 0;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/JavaBitMapFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/JavaBitMapFactory.java
new file mode 100644
index 00000000000..5c1f2dc4116
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/JavaBitMapFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.metamx.collections.bitmap.BitSetBitmapFactory;
+import com.metamx.collections.bitmap.BitmapFactory;
+import com.metamx.collections.bitmap.MutableBitmap;
+
+public class JavaBitMapFactory implements BitMapFactory
+{
+ private static final BitmapFactory bitmapFactory = new BitSetBitmapFactory();
+
+ public JavaBitMapFactory() {}
+
+ @Override
+ public MutableBitmap makeEmptyMutableBitmap()
+ {
+ return bitmapFactory.makeEmptyMutableBitmap();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JavaBitMapFactory";
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || o instanceof JavaBitMapFactory;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return 0;
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/RoaringBitMapFactory.java b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/RoaringBitMapFactory.java
new file mode 100644
index 00000000000..0da6cc04e17
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/java/io/druid/query/aggregation/distinctcount/RoaringBitMapFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.metamx.collections.bitmap.BitmapFactory;
+import com.metamx.collections.bitmap.MutableBitmap;
+import com.metamx.collections.bitmap.RoaringBitmapFactory;
+
+public class RoaringBitMapFactory implements BitMapFactory
+{
+ private static final BitmapFactory bitmapFactory = new RoaringBitmapFactory();
+
+ public RoaringBitMapFactory() {}
+
+ @Override
+ public MutableBitmap makeEmptyMutableBitmap()
+ {
+ return bitmapFactory.makeEmptyMutableBitmap();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RoaringBitMapFactory";
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return this == o || o instanceof RoaringBitMapFactory;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return 0;
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-contrib/distinctcount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 00000000000..42447ce0134
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.query.aggregation.distinctcount.DistinctCountDruidModule
diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
new file mode 100644
index 00000000000..2b9ece5ae67
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import io.druid.collections.StupidPool;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.data.input.Row;
+import io.druid.granularity.QueryGranularity;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.dimension.DefaultDimensionSpec;
+import io.druid.query.dimension.DimensionSpec;
+import io.druid.query.groupby.GroupByQuery;
+import io.druid.query.groupby.GroupByQueryConfig;
+import io.druid.query.groupby.GroupByQueryEngine;
+import io.druid.query.groupby.GroupByQueryQueryToolChest;
+import io.druid.query.groupby.GroupByQueryRunnerFactory;
+import io.druid.query.groupby.GroupByQueryRunnerTestHelper;
+import io.druid.query.groupby.orderby.DefaultLimitSpec;
+import io.druid.query.groupby.orderby.OrderByColumnSpec;
+import io.druid.segment.IncrementalIndexSegment;
+import io.druid.segment.Segment;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.OnheapIncrementalIndex;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+public class DistinctCountGroupByQueryTest
+{
+
+ @Test
+ public void testGroupByWithDistinctCountAgg() throws Exception
+ {
+ final ObjectMapper mapper = new DefaultObjectMapper();
+ final StupidPool pool = new StupidPool(
+ new Supplier()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocate(1024 * 1024);
+ }
+ }
+ );
+
+ final GroupByQueryConfig config = new GroupByQueryConfig();
+ config.setMaxIntermediateRows(10000);
+
+ final Supplier configSupplier = Suppliers.ofInstance(config);
+ final GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
+
+ final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
+ engine,
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+ configSupplier,
+ new GroupByQueryQueryToolChest(
+ configSupplier, mapper, engine, pool,
+ QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+ ),
+ pool
+ );
+
+ IncrementalIndex index = new OnheapIncrementalIndex(
+ 0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
+ );
+ String visitor_id = "visitor_id";
+ String client_type = "client_type";
+ long timestamp = System.currentTimeMillis();
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "0", client_type, "iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp + 1,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "1", client_type, "iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp + 2,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "2", client_type, "android")
+ )
+ );
+
+ GroupByQuery query = new GroupByQuery.Builder()
+ .setDataSource(QueryRunnerTestHelper.dataSource)
+ .setGranularity(QueryRunnerTestHelper.allGran)
+ .setDimensions(
+ Arrays.asList(
+ new DefaultDimensionSpec(
+ client_type,
+ client_type
+ )
+ )
+ )
+ .setInterval(QueryRunnerTestHelper.fullOnInterval)
+ .setLimitSpec(
+ new DefaultLimitSpec(
+ Lists.newArrayList(
+ new OrderByColumnSpec(
+ client_type,
+ OrderByColumnSpec.Direction.DESCENDING
+ )
+ ), 10
+ )
+ )
+ .setAggregatorSpecs(
+ Lists.newArrayList(
+ QueryRunnerTestHelper.rowsCount,
+ new DistinctCountAggregatorFactory("UV", visitor_id, null)
+ )
+ )
+ .build();
+ final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
+
+ Iterable results = GroupByQueryRunnerTestHelper.runQuery(
+ factory,
+ factory.createRunner(incrementalIndexSegment),
+ query
+ );
+
+ List expectedResults = Arrays.asList(
+ GroupByQueryRunnerTestHelper.createExpectedRow(
+ "1970-01-01T00:00:00.000Z",
+ client_type, "iphone",
+ "UV", 2L,
+ "rows", 2L
+ ),
+ GroupByQueryRunnerTestHelper.createExpectedRow(
+ "1970-01-01T00:00:00.000Z",
+ client_type, "android",
+ "UV", 1L,
+ "rows", 1L
+ )
+ );
+ TestHelper.assertExpectedObjects(expectedResults, results, "distinct-count");
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
new file mode 100644
index 00000000000..7581c4ed1e1
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequences;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.Druids;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import io.druid.segment.incremental.OnheapIncrementalIndex;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class DistinctCountTimeseriesQueryTest
+{
+
+ @Test
+ public void testTopNWithDistinctCountAgg() throws Exception
+ {
+ TimeseriesQueryEngine engine = new TimeseriesQueryEngine();
+
+ IncrementalIndex index = new OnheapIncrementalIndex(
+ 0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
+ );
+ String visitor_id = "visitor_id";
+ String client_type = "client_type";
+ DateTime time = new DateTime("2016-03-04T00:00:00.000Z");
+ long timestamp = time.getMillis();
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "0", client_type, "iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "1", client_type, "iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "2", client_type, "android")
+ )
+ );
+
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource(QueryRunnerTestHelper.dataSource)
+ .granularity(QueryRunnerTestHelper.allGran)
+ .intervals(QueryRunnerTestHelper.fullOnInterval)
+ .aggregators(
+ Lists.newArrayList(
+ QueryRunnerTestHelper.rowsCount,
+ new DistinctCountAggregatorFactory("UV", visitor_id, null)
+ )
+ )
+ .build();
+
+ final Iterable> results = Sequences.toList(
+ engine.process(
+ query,
+ new IncrementalIndexStorageAdapter(index)
+ ),
+ Lists.>newLinkedList()
+ );
+
+ List> expectedResults = Arrays.asList(
+ new Result<>(
+ time,
+ new TimeseriesResultValue(
+ ImmutableMap.of("UV", 3, "rows", 3L)
+ )
+ )
+ );
+ TestHelper.assertExpectedResults(expectedResults, results);
+ }
+}
diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
new file mode 100644
index 00000000000..561a0909db1
--- /dev/null
+++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.distinctcount;
+
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.metamx.common.guava.Sequences;
+import io.druid.collections.StupidPool;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.granularity.QueryGranularity;
+import io.druid.query.QueryRunnerTestHelper;
+import io.druid.query.Result;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.topn.TopNQuery;
+import io.druid.query.topn.TopNQueryBuilder;
+import io.druid.query.topn.TopNQueryEngine;
+import io.druid.query.topn.TopNResultValue;
+import io.druid.segment.TestHelper;
+import io.druid.segment.incremental.IncrementalIndex;
+import io.druid.segment.incremental.IncrementalIndexStorageAdapter;
+import io.druid.segment.incremental.OnheapIncrementalIndex;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class DistinctCountTopNQueryTest
+{
+
+ @Test
+ public void testTopNWithDistinctCountAgg() throws Exception
+ {
+ TopNQueryEngine engine = new TopNQueryEngine(
+ new StupidPool(
+ new Supplier()
+ {
+ @Override
+ public ByteBuffer get()
+ {
+ return ByteBuffer.allocate(1024 * 1024);
+ }
+ }
+ )
+ );
+
+ IncrementalIndex index = new OnheapIncrementalIndex(
+ 0, QueryGranularity.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
+ );
+ String visitor_id = "visitor_id";
+ String client_type = "client_type";
+ DateTime time = new DateTime("2016-03-04T00:00:00.000Z");
+ long timestamp = time.getMillis();
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "0", client_type, "iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "1", client_type, "iphone")
+ )
+ );
+ index.add(
+ new MapBasedInputRow(
+ timestamp,
+ Lists.newArrayList(visitor_id, client_type),
+ ImmutableMap.of(visitor_id, "2", client_type, "android")
+ )
+ );
+
+ TopNQuery query = new TopNQueryBuilder().dataSource(QueryRunnerTestHelper.dataSource)
+ .granularity(QueryRunnerTestHelper.allGran)
+ .intervals(QueryRunnerTestHelper.fullOnInterval)
+ .dimension(client_type)
+ .metric("UV")
+ .threshold(10)
+ .aggregators(
+ Lists.newArrayList(
+ QueryRunnerTestHelper.rowsCount,
+ new DistinctCountAggregatorFactory("UV", visitor_id, null)
+ )
+ )
+ .build();
+
+ final Iterable> results = Sequences.toList(
+ engine.query(
+ query,
+ new IncrementalIndexStorageAdapter(index)
+ ),
+ Lists.>newLinkedList()
+ );
+
+ List> expectedResults = Arrays.asList(
+ new Result<>(
+ time,
+ new TopNResultValue(
+ Arrays.