Extract HLL related code to separate module (#3900)

This commit is contained in:
DaimonPl 2017-02-03 18:45:11 +01:00 committed by Gian Merlino
parent 8f4394ca49
commit 93b71e265e
28 changed files with 256 additions and 142 deletions

86
hll/pom.xml Normal file
View File

@ -0,0 +1,86 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.3-SNAPSHOT</version>
</parent>
<artifactId>druid-hll</artifactId>
<name>druid-hll</name>
<description>Druid HyperLogLog implementation</description>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0</url>
</license>
</licenses>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>java-util</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.caliper</groupId>
<artifactId>caliper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
/**
*/

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
import java.nio.ByteBuffer;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
import java.nio.ByteBuffer;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.primitives.UnsignedBytes;
@ -112,6 +112,16 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
return (remaining % 3 == 0 || remaining == 1027) ? new HLLCV0(buffer) : new HLLCV1(buffer);
}
/**
* Creates new collector which shares others collector buffer (by using {@link ByteBuffer#duplicate()})
*
* @param otherCollector collector which buffer will be shared
* @return collector
*/
public static HyperLogLogCollector makeCollectorSharingStorage(HyperLogLogCollector otherCollector) {
return makeCollector(otherCollector.getStorageBuffer().duplicate());
}
public static int getLatestNumBytesForDenseStorage()
{
return HLLCV1.NUM_BYTES_FOR_DENSE_STORAGE;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
import com.google.caliper.Param;
import com.google.caliper.Runner;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
@ -34,7 +34,6 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
@ -696,125 +695,6 @@ public class HyperLogLogCollectorTest
Assert.assertEquals(Double.MAX_VALUE, collector.estimateCardinality(), 1000);
}
@Test
public void testCompare1() throws Exception
{
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
collector1.add(fn.hashLong(0).asBytes());
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 100; i = i + 2) {
collector1.add(fn.hashLong(i).asBytes());
collector2.add(fn.hashLong(i + 1).asBytes());
Assert.assertEquals(1, comparator.compare(collector1, collector2));
Assert.assertEquals(1, Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()));
}
}
@Test
public void testCompare2() throws Exception
{
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 1000; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(50);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 1 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
for (int i = 1; i < 100; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(500);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 2 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
for (int i = 1; i < 10; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(100000);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 20000 + rand.nextInt(100000);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
}
@Test
public void testCompareToShouldBehaveConsistentlyWithEstimatedCardinalitiesEvenInToughCases() throws Exception {
// given
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 0; i < 1000; ++i) {
// given
HyperLogLogCollector leftCollector = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(9000) + 5000;
for (int l = 0; l < j; ++l) {
leftCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector rightCollector = HyperLogLogCollector.makeLatestCollector();
int k = rand.nextInt(9000) + 5000;
for (int l = 0; l < k; ++l) {
rightCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}
// when
final int orderedByCardinality = Double.compare(leftCollector.estimateCardinality(),
rightCollector.estimateCardinality());
final int orderedByComparator = comparator.compare(leftCollector, rightCollector);
// then, assert hyperloglog comparator behaves consistently with estimated cardinalities
Assert.assertEquals(
String.format("orderedByComparator=%d, orderedByCardinality=%d,\n" +
"Left={cardinality=%f, hll=%s},\n" +
"Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality,
leftCollector.estimateCardinality(), leftCollector,
rightCollector.estimateCardinality(), rightCollector),
orderedByCardinality,
orderedByComparator
);
}
}
@Test
public void testMaxOverflow() {
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();

View File

@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.hyperloglog;
package io.druid.hll;
import com.carrotsearch.junitbenchmarks.AbstractBenchmark;
import com.carrotsearch.junitbenchmarks.BenchmarkOptions;

View File

@ -31,9 +31,9 @@ import com.google.common.io.Closeables;
import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;

View File

@ -23,12 +23,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.hll.HyperLogLogCollector;
import io.druid.jackson.AggregatorsModule;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;

View File

@ -46,6 +46,7 @@ import io.druid.data.input.InputRow;
import io.druid.data.input.Rows;
import io.druid.granularity.QueryGranularity;
import io.druid.guice.annotations.Smile;
import io.druid.hll.HyperLogLogCollector;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskLock;
@ -60,7 +61,6 @@ import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.DruidMetrics;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.IOConfig;

View File

@ -92,6 +92,7 @@
<module>java-util</module>
<module>bytebuffer-collections</module>
<module>extendedset</module>
<module>hll</module>
<!-- Core extensions -->
<module>extensions-core/avro-extensions</module>
<module>extensions-core/datasketches</module>

View File

@ -35,6 +35,11 @@
<artifactId>druid-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-hll</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>bytebuffer-collections</artifactId>

View File

@ -22,10 +22,10 @@ package io.druid.query.aggregation.cardinality;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import java.util.List;

View File

@ -25,6 +25,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.StringUtils;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.Aggregator;
@ -34,7 +35,6 @@ import io.druid.query.aggregation.Aggregators;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.dimension.DefaultDimensionSpec;
import io.druid.query.dimension.DimensionSpec;

View File

@ -19,10 +19,10 @@
package io.druid.query.aggregation.cardinality;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.query.ColumnSelectorPlus;
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import java.nio.ByteBuffer;
import java.util.List;

View File

@ -20,7 +20,7 @@
package io.druid.query.aggregation.cardinality.types;
import com.google.common.hash.Hasher;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.dimension.ColumnSelectorStrategy;
import io.druid.segment.ColumnValueSelector;

View File

@ -20,8 +20,8 @@
package io.druid.query.aggregation.cardinality.types;
import com.google.common.hash.Hasher;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.cardinality.CardinalityAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.data.IndexedInts;
import it.unimi.dsi.fastutil.ints.IntIterator;

View File

@ -19,6 +19,7 @@
package io.druid.query.aggregation.hyperloglog;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;
@ -55,7 +56,7 @@ public class HyperUniquesAggregator implements Aggregator
public Object get()
{
// Workaround for OnheapIncrementalIndex's penchant for calling "aggregate" and "get" simultaneously.
return HyperLogLogCollector.makeCollector(collector.getStorageBuffer().duplicate());
return HyperLogLogCollector.makeCollectorSharingStorage(collector);
}
@Override

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Ordering;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.StringUtils;
import io.druid.query.aggregation.Aggregator;

View File

@ -19,6 +19,7 @@
package io.druid.query.aggregation.hyperloglog;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.google.common.collect.Ordering;
import com.google.common.hash.HashFunction;
import io.druid.data.input.InputRow;
import io.druid.hll.HyperLogLogCollector;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;

View File

@ -19,7 +19,7 @@
package io.druid.query.aggregation;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.LongColumnSelector;
import org.junit.Assert;

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.hyperloglog;
import com.google.common.collect.ImmutableMap;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.hll.HyperLogLogCollector;
import org.junit.Assert;
import org.junit.Test;

View File

@ -19,9 +19,16 @@
package io.druid.query.aggregation.hyperloglog;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.druid.hll.HLLCV0;
import io.druid.hll.HyperLogLogCollector;
import org.junit.Assert;
import org.junit.Test;
import java.util.Comparator;
import java.util.Random;
public class HyperUniquesAggregatorFactoryTest
{
final static HyperUniquesAggregatorFactory aggregatorFactory = new HyperUniquesAggregatorFactory(
@ -30,10 +37,131 @@ public class HyperUniquesAggregatorFactoryTest
);
final static String V0_BASE64 = "AAYbEyQwFyQVASMCVFEQQgEQIxIhM4ISAQMhUkICEDFDIBMhMgFQFAFAMjAAEhEREyVAEiUBAhIjISATMCECMiERIRIiVRFRAyIAEgFCQSMEJAITATAAEAMQgCEBEjQiAyUTAyEQASJyAGURAAISAwISATETQhAREBYDIVIlFTASAzJgERIgRCcmUyAwNAMyEJMjIhQXQhEWECABQDETATEREjIRAgEyIiMxMBQiAkBBMDYAMEQQACMzMhIkMTQSkYIRABIBADMBAhIEISAENkEBQDAxETMAIEEwEzQiQSEVQSFBBAQDICIiAVIAMTAQIQYBIRABADMDEzEAQSMkEiAYFBAQI0AmECEyQSARRTIVMhEkMiKAMCUBxUghAkIBI3EmMAQiACEAJDJCAAADOzESEDBCRjMgEUQQETQwEWIhA6MlAiAAZDI1AgEIIDUyFDIHMQEEAwIRBRABBStCZCQhAgJSMQIiQEEURTBmM1MxACIAETGhMgQnBRICNiIREyIUNAEAAkABAwQSEBJBIhIhIRERAiIRACUhEUAVMkQGEVMjECYjACBwEQQSIRIgAAEyExQUFSEAIBJCIDIDYTAgMiNBIUADUiETADMoFEADETMCIwUEQkIAESMSIzIABDERIXEhIiACQgUSEgJiQCAUARIRAREDQiEUAkQgAgQiIEAzIxRCARIgBAAVAzMAECEwE0Qh8gAAASEhEiAiMhUxcRImIVABATYyUBAwIoE1QhRDIiYBIBEBEiQSQyERAAADMAARAEACFYUwQSQBIRIgURITARFSEzEHEBACOTMREBIAMjIgEhU0cxEQIRIhIi1wEgMRUBEgMQIRAnAVASURMHQBAiEyBSAAEBQTAWQ5EQA0IUMSISAUEiASIjIhMhMFJBBSEjEAECEwACASEQFBAjARITEQIgYTEKEAeAAiMkEyARowARFBAicRISIBIxAQAgEBARMCIRQgMSIVIAkjMxIAIEMyADASMgFRIjEyKjEjBBIEQCUAARYBEQMxMCIBACNCACRCMlEzUUAAUDM1MhAjEgAxAAISAVFQECAhQAMBMhEzEgASNxAhFRIxECMRJBQAERAToBgQMhJSRQFAEhAwMiIhMQAwAgQiBQJiIGMQQhEiQxR1MiAjIAIEEiAkARECEzQlMjECIRATBgIhEBQAIQAEATEjBCMwAgMBMhAhIyFBIxQAARI1AAEABCIDFBIRUzMBIgAgEiARQCASMQQDQCFBAQAUJwMUElAyIAIRBSIRITICEAIxMAEUBEYTcBMBEEIxMREwIRIDAGIAEgYxBAEANCAhBAI2UhIiIgIRABIEVRAwNEIQERQgEFMhFCQSIAEhQDMTEQMiAjJyEQ==";
private final HashFunction fn = Hashing.murmur3_128();
@Test
public void testDeserializeV0() throws Exception
{
Object v0 = aggregatorFactory.deserialize(V0_BASE64);
Assert.assertEquals("deserialized value is HLLCV0", HLLCV0.class, v0.getClass());
}
@Test
public void testCompare1() throws Exception
{
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
collector1.add(fn.hashLong(0).asBytes());
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 100; i = i + 2) {
collector1.add(fn.hashLong(i).asBytes());
collector2.add(fn.hashLong(i + 1).asBytes());
Assert.assertEquals(1, comparator.compare(collector1, collector2));
Assert.assertEquals(1, Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()));
}
}
@Test
public void testCompare2() throws Exception
{
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 1; i < 1000; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(50);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 1 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
for (int i = 1; i < 100; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(500);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 2 + rand.nextInt(5);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
for (int i = 1; i < 10; ++i) {
HyperLogLogCollector collector1 = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(100000);
for (int l = 0; l < j; ++l) {
collector1.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector collector2 = HyperLogLogCollector.makeLatestCollector();
int k = j + 20000 + rand.nextInt(100000);
for (int l = 0; l < k; ++l) {
collector2.add(fn.hashLong(rand.nextLong()).asBytes());
}
Assert.assertEquals(
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
comparator.compare(collector1, collector2)
);
}
}
@Test
public void testCompareToShouldBehaveConsistentlyWithEstimatedCardinalitiesEvenInToughCases() throws Exception {
// given
Random rand = new Random(0);
HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory("foo", "bar");
Comparator comparator = factory.getComparator();
for (int i = 0; i < 1000; ++i) {
// given
HyperLogLogCollector leftCollector = HyperLogLogCollector.makeLatestCollector();
int j = rand.nextInt(9000) + 5000;
for (int l = 0; l < j; ++l) {
leftCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}
HyperLogLogCollector rightCollector = HyperLogLogCollector.makeLatestCollector();
int k = rand.nextInt(9000) + 5000;
for (int l = 0; l < k; ++l) {
rightCollector.add(fn.hashLong(rand.nextLong()).asBytes());
}
// when
final int orderedByCardinality = Double.compare(leftCollector.estimateCardinality(),
rightCollector.estimateCardinality());
final int orderedByComparator = comparator.compare(leftCollector, rightCollector);
// then, assert hyperloglog comparator behaves consistently with estimated cardinalities
Assert.assertEquals(
String.format("orderedByComparator=%d, orderedByCardinality=%d,\n" +
"Left={cardinality=%f, hll=%s},\n" +
"Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality,
leftCollector.estimateCardinality(), leftCollector,
rightCollector.estimateCardinality(), rightCollector),
orderedByCardinality,
orderedByComparator
);
}
}
}

View File

@ -2886,7 +2886,7 @@ public class GroupByQueryRunnerTest
// havingSpec equalTo/greaterThan/lessThan do not work on complex aggregators, even if they could be finalized.
// See also: https://github.com/druid-io/druid/issues/2507
expectedException.expect(ParseException.class);
expectedException.expectMessage("Unknown type[class io.druid.query.aggregation.hyperloglog.HLLCV1]");
expectedException.expectMessage("Unknown type[class io.druid.hll.HLLCV1]");
Iterable<Row> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
TestHelper.assertExpectedObjects(expectedResults, results, "order-limit");
}

View File

@ -41,7 +41,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
@ -52,8 +51,9 @@ import io.druid.client.selector.ServerSelector;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.PeriodGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.granularity.QueryGranularities;
import io.druid.granularity.QueryGranularity;
import io.druid.hll.HyperLogLogCollector;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
@ -78,7 +78,6 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperLogLogCollector;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
import io.druid.query.aggregation.post.ConstantPostAggregator;

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.granularity.QueryGranularities;
import io.druid.hll.HLLCV1;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.Druids;
@ -39,7 +40,6 @@ import io.druid.query.aggregation.LongMinAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HLLCV1;
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.query.aggregation.post.ArithmeticPostAggregator;