mirror of https://github.com/apache/druid.git
pre-computed HLL support for hyperUnique aggregator (#3909)
This commit is contained in:
parent
8854ce018e
commit
a2875a4d91
|
@ -24,7 +24,6 @@ import com.google.common.base.Function;
|
|||
import com.google.common.base.Predicate;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -32,6 +31,7 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -152,7 +152,7 @@ public class FilterPartitionBenchmark
|
|||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.benchmark;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -31,6 +30,7 @@ import io.druid.benchmark.query.QueryBenchmarkUtil;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -152,7 +152,7 @@ public class FilteredAggregatorBenchmark
|
|||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
|
|
@ -26,7 +26,6 @@ import com.google.common.base.Suppliers;
|
|||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -40,6 +39,7 @@ import io.druid.data.input.Row;
|
|||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -286,7 +286,7 @@ public class GroupByTypeInterfaceBenchmark
|
|||
log.info("SETUP CALLED AT %d", System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]");
|
||||
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.benchmark;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -33,6 +32,7 @@ import io.druid.concurrent.Execs;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -241,7 +241,7 @@ public class TopNTypeInterfaceBenchmark
|
|||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
|
||||
|
|
|
@ -20,14 +20,13 @@
|
|||
package io.druid.benchmark.indexing;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.hash.Hashing;
|
||||
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
@ -98,7 +97,7 @@ public class IncrementalIndexReadBenchmark
|
|||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
|
|
@ -19,14 +19,13 @@
|
|||
|
||||
package io.druid.benchmark.indexing;
|
||||
|
||||
import com.google.common.hash.Hashing;
|
||||
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.segment.incremental.IncrementalIndex;
|
||||
|
@ -76,7 +75,7 @@ public class IndexIngestionBenchmark
|
|||
@Setup
|
||||
public void setup() throws IOException
|
||||
{
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
|
||||
rows = new ArrayList<InputRow>();
|
||||
schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.benchmark.indexing;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -28,6 +27,7 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
|
@ -114,7 +114,7 @@ public class IndexMergeBenchmark
|
|||
log.info("SETUP CALLED AT " + + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
indexesToMerge = new ArrayList<>();
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.benchmark.indexing;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -28,6 +27,7 @@ import io.druid.benchmark.datagen.BenchmarkSchemas;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
|
@ -112,7 +112,7 @@ public class IndexPersistBenchmark
|
|||
log.info("SETUP CALLED AT " + + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
rows = new ArrayList<InputRow>();
|
||||
|
|
|
@ -27,7 +27,6 @@ import com.google.common.base.Throwables;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -40,6 +39,7 @@ import io.druid.data.input.Row;
|
|||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -330,7 +330,7 @@ public class GroupByBenchmark
|
|||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
executorService = Execs.multiThreaded(numProcessingThreads, "GroupByThreadPool[%d]");
|
||||
|
||||
|
|
|
@ -25,7 +25,6 @@ import com.google.common.base.Suppliers;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -35,6 +34,7 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -317,7 +317,7 @@ public class SearchBenchmark
|
|||
log.info("SETUP CALLED AT " + +System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
executorService = Execs.multiThreaded(numSegments, "SearchThreadPool");
|
||||
|
||||
|
|
|
@ -22,9 +22,7 @@ package io.druid.benchmark.query;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemas;
|
||||
|
@ -33,6 +31,7 @@ import io.druid.data.input.InputRow;
|
|||
import io.druid.data.input.Row;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -177,7 +176,7 @@ public class SelectBenchmark
|
|||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "SelectThreadPool");
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.benchmark.query;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -31,6 +30,7 @@ import io.druid.common.utils.JodaUtils;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.Row;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
@ -114,7 +114,7 @@ public class SqlBenchmark
|
|||
log.info("Starting benchmark setup using tmpDir[%s], rows[%,d].", tmpDir, rowsPerSegment);
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.benchmark.query;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -31,6 +30,7 @@ import io.druid.concurrent.Execs;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -243,7 +243,7 @@ public class TimeseriesBenchmark
|
|||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "TimeseriesThreadPool");
|
||||
|
|
|
@ -22,7 +22,6 @@ package io.druid.benchmark.query;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.Files;
|
||||
import io.druid.benchmark.datagen.BenchmarkDataGenerator;
|
||||
import io.druid.benchmark.datagen.BenchmarkSchemaInfo;
|
||||
|
@ -32,6 +31,7 @@ import io.druid.concurrent.Execs;
|
|||
import io.druid.data.input.InputRow;
|
||||
import io.druid.data.input.impl.DimensionsSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.guava.Sequence;
|
||||
import io.druid.java.util.common.guava.Sequences;
|
||||
|
@ -218,7 +218,7 @@ public class TopNBenchmark
|
|||
log.info("SETUP CALLED AT " + System.currentTimeMillis());
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
executorService = Execs.multiThreaded(numSegments, "TopNThreadPool");
|
||||
|
|
|
@ -263,9 +263,17 @@ Determine the number of distinct starting characters of last names
|
|||
Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to compute the estimated cardinality of a dimension that has been aggregated as a "hyperUnique" metric at indexing time.
|
||||
|
||||
```json
|
||||
{ "type" : "hyperUnique", "name" : <output_name>, "fieldName" : <metric_name> }
|
||||
{
|
||||
"type" : "hyperUnique",
|
||||
"name" : <output_name>,
|
||||
"fieldName" : <metric_name>,
|
||||
"isInputHyperUnique" : false
|
||||
}
|
||||
```
|
||||
|
||||
isInputHyperUnique can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected).
|
||||
The isInputHyperUnique field only affects ingestion-time behavior, and is ignored at query time.
|
||||
|
||||
For more approximate aggregators, please see [theta sketches](../development/extensions-core/datasketches-aggregators.html).
|
||||
|
||||
## Miscellaneous Aggregations
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.hll;
|
||||
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
|
||||
/**
|
||||
* Contains data hashing methods used before applying Hyper Log Log.
|
||||
*
|
||||
* {@link HyperLogLogCollector#add(byte[])} requires hashed value on input. This class makes it easier to achieve
|
||||
* consistent value hashing for both internal aggregation and external hashing for pre-computed aggregation.
|
||||
* <p>
|
||||
* By default 128-bit murmur3 algorithm, x64 variant is used. Implementation is thread safe.
|
||||
* <p>
|
||||
* Caution! changing of implementation may cause improper cardinality estimation between data hashed with different
|
||||
* versions.
|
||||
*/
|
||||
public class HyperLogLogHash {
|
||||
private static final HyperLogLogHash DEFAULT = new HyperLogLogHash(Hashing.murmur3_128());
|
||||
|
||||
public static HyperLogLogHash getDefault()
|
||||
{
|
||||
return DEFAULT;
|
||||
}
|
||||
|
||||
private final HashFunction hashFunction;
|
||||
|
||||
public HyperLogLogHash(HashFunction hashFunction) {
|
||||
this.hashFunction = hashFunction;
|
||||
}
|
||||
|
||||
public byte[] hash(byte[] rawValue)
|
||||
{
|
||||
return hashFunction.hashBytes(rawValue).asBytes();
|
||||
}
|
||||
|
||||
public byte[] hash(String rawValue)
|
||||
{
|
||||
return hash(StringUtils.toUtf8(rawValue));
|
||||
}
|
||||
}
|
|
@ -22,34 +22,35 @@ package io.druid.jackson;
|
|||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.CountAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleMinAggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.FilteredAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.HistogramAggregatorFactory;
|
||||
import io.druid.query.aggregation.JavaScriptAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongMaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongMinAggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.DoubleFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.first.LongFirstAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
|
||||
import io.druid.query.aggregation.hyperloglog.PreComputedHyperUniquesSerde;
|
||||
import io.druid.query.aggregation.last.DoubleLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.last.LongLastAggregatorFactory;
|
||||
import io.druid.query.aggregation.post.ArithmeticPostAggregator;
|
||||
import io.druid.query.aggregation.post.ConstantPostAggregator;
|
||||
import io.druid.query.aggregation.post.DoubleGreatestPostAggregator;
|
||||
import io.druid.query.aggregation.post.DoubleLeastPostAggregator;
|
||||
import io.druid.query.aggregation.post.ExpressionPostAggregator;
|
||||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import io.druid.query.aggregation.post.JavaScriptPostAggregator;
|
||||
import io.druid.query.aggregation.post.ExpressionPostAggregator;
|
||||
import io.druid.query.aggregation.post.LongGreatestPostAggregator;
|
||||
import io.druid.query.aggregation.post.LongLeastPostAggregator;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
|
@ -63,7 +64,11 @@ public class AggregatorsModule extends SimpleModule
|
|||
super("AggregatorFactories");
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
if (ComplexMetrics.getSerdeForType("preComputedHyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("preComputedHyperUnique", new PreComputedHyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
|
||||
setMixInAnnotation(AggregatorFactory.class, AggregatorFactoryMixin.class);
|
||||
|
|
|
@ -38,6 +38,7 @@ import java.nio.ByteBuffer;
|
|||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -56,15 +57,26 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
private final boolean isInputHyperUnique;
|
||||
|
||||
@JsonCreator
|
||||
public HyperUniquesAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") String fieldName
|
||||
@JsonProperty("fieldName") String fieldName,
|
||||
@JsonProperty("isInputHyperUnique") Boolean isInputHyperUnique
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.isInputHyperUnique = (isInputHyperUnique == null) ? false : isInputHyperUnique;
|
||||
}
|
||||
|
||||
public HyperUniquesAggregatorFactory(
|
||||
String name,
|
||||
String fieldName
|
||||
)
|
||||
{
|
||||
this(name, fieldName, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -126,7 +138,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
return new HyperUniquesAggregatorFactory(name, name, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,7 +154,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(fieldName, fieldName));
|
||||
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(fieldName, fieldName, isInputHyperUnique));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -165,7 +177,6 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return estimateCardinality(object);
|
||||
|
@ -190,6 +201,12 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
return fieldName;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean getIsInputHyperUnique()
|
||||
{
|
||||
return isInputHyperUnique;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
|
@ -201,7 +218,11 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public String getTypeName()
|
||||
{
|
||||
return "hyperUnique";
|
||||
if (isInputHyperUnique) {
|
||||
return "preComputedHyperUnique";
|
||||
} else {
|
||||
return "hyperUnique";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -216,6 +237,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
return "HyperUniquesAggregatorFactory{" +
|
||||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", isInputHyperUnique=" + isInputHyperUnique +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -231,21 +253,13 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
|
||||
HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o;
|
||||
|
||||
if (!fieldName.equals(that.fieldName)) {
|
||||
return false;
|
||||
}
|
||||
if (!name.equals(that.name)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
return Objects.equals(fieldName, that.fieldName) && Objects.equals(name, that.name) &&
|
||||
Objects.equals(isInputHyperUnique, that.isInputHyperUnique);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = name.hashCode();
|
||||
result = 31 * result + fieldName.hashCode();
|
||||
return result;
|
||||
return Objects.hash(name, fieldName, isInputHyperUnique);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,10 +20,9 @@
|
|||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.hll.HyperLogLogCollector;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.segment.column.ColumnBuilder;
|
||||
import io.druid.segment.data.GenericIndexed;
|
||||
import io.druid.segment.data.ObjectStrategy;
|
||||
|
@ -34,8 +33,6 @@ import io.druid.segment.serde.ComplexMetricSerde;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class HyperUniquesSerde extends ComplexMetricSerde
|
||||
{
|
||||
private static Ordering<HyperLogLogCollector> comparator = new Ordering<HyperLogLogCollector>()
|
||||
|
@ -49,13 +46,11 @@ public class HyperUniquesSerde extends ComplexMetricSerde
|
|||
}
|
||||
}.nullsFirst();
|
||||
|
||||
private final HashFunction hashFn;
|
||||
private final HyperLogLogHash hyperLogLogHash;
|
||||
|
||||
public HyperUniquesSerde(
|
||||
HashFunction hashFn
|
||||
)
|
||||
public HyperUniquesSerde(HyperLogLogHash hyperLogLogHash)
|
||||
{
|
||||
this.hashFn = hashFn;
|
||||
this.hyperLogLogHash = hyperLogLogHash;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,9 +86,7 @@ public class HyperUniquesSerde extends ComplexMetricSerde
|
|||
}
|
||||
|
||||
for (String dimensionValue : dimValues) {
|
||||
collector.add(
|
||||
hashFn.hashBytes(StringUtils.toUtf8(dimensionValue)).asBytes()
|
||||
);
|
||||
collector.add(hyperLogLogHash.hash(dimensionValue));
|
||||
}
|
||||
return collector;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.hll.HyperLogLogCollector;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.segment.serde.ComplexMetricExtractor;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
public class PreComputedHyperUniquesSerde extends HyperUniquesSerde
|
||||
{
|
||||
public PreComputedHyperUniquesSerde(HyperLogLogHash hyperLogLogHash)
|
||||
{
|
||||
super(hyperLogLogHash);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ComplexMetricExtractor getExtractor()
|
||||
{
|
||||
return new ComplexMetricExtractor()
|
||||
{
|
||||
@Override
|
||||
public Class<HyperLogLogCollector> extractedClass()
|
||||
{
|
||||
return HyperLogLogCollector.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HyperLogLogCollector extractValue(InputRow inputRow, String metricName)
|
||||
{
|
||||
Object rawValue = inputRow.getRaw(metricName);
|
||||
|
||||
if (rawValue == null)
|
||||
{
|
||||
return HyperLogLogCollector.makeLatestCollector();
|
||||
} else if (rawValue instanceof HyperLogLogCollector)
|
||||
{
|
||||
return (HyperLogLogCollector) rawValue;
|
||||
} else if (rawValue instanceof byte[])
|
||||
{
|
||||
return HyperLogLogCollector.makeLatestCollector().fold(ByteBuffer.wrap((byte[]) rawValue));
|
||||
} else if (rawValue instanceof String)
|
||||
{
|
||||
return HyperLogLogCollector.makeLatestCollector().fold(ByteBuffer.wrap(Base64.decodeBase64((String) rawValue)));
|
||||
}
|
||||
|
||||
throw new ISE("Object is not of a type[%s] that can be deserialized to HyperLogLog.", rawValue.getClass());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
|
@ -123,4 +123,66 @@ public class HyperUniquesAggregationTest
|
|||
Assert.assertEquals(3.0, row.getFloatMetric("index_hll"), 0.1);
|
||||
Assert.assertEquals(3.0, row.getFloatMetric("index_unique_count"), 0.1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIngestAndQueryPrecomputedHll() throws Exception
|
||||
{
|
||||
AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
Lists.newArrayList(new AggregatorsModule()),
|
||||
config,
|
||||
tempFolder
|
||||
);
|
||||
|
||||
String metricSpec = "[{"
|
||||
+ "\"type\": \"hyperUnique\","
|
||||
+ "\"name\": \"index_hll\","
|
||||
+ "\"fieldName\": \"preComputedHll\","
|
||||
+ "\"isInputHyperUnique\": true"
|
||||
+ "}]";
|
||||
|
||||
String parseSpec = "{"
|
||||
+ "\"type\" : \"string\","
|
||||
+ "\"parseSpec\" : {"
|
||||
+ " \"format\" : \"tsv\","
|
||||
+ " \"timestampSpec\" : {"
|
||||
+ " \"column\" : \"timestamp\","
|
||||
+ " \"format\" : \"auto\""
|
||||
+ "},"
|
||||
+ " \"dimensionsSpec\" : {"
|
||||
+ " \"dimensions\": [],"
|
||||
+ " \"dimensionExclusions\" : [],"
|
||||
+ " \"spatialDimensions\" : []"
|
||||
+ " },"
|
||||
+ " \"columns\": [\"timestamp\", \"market\", \"preComputedHll\"]"
|
||||
+ " }"
|
||||
+ "}";
|
||||
|
||||
String query = "{"
|
||||
+ "\"queryType\": \"groupBy\","
|
||||
+ "\"dataSource\": \"test_datasource\","
|
||||
+ "\"granularity\": \"ALL\","
|
||||
+ "\"dimensions\": [],"
|
||||
+ "\"aggregations\": ["
|
||||
+ " { \"type\": \"hyperUnique\", \"name\": \"index_hll\", \"fieldName\": \"index_hll\" }"
|
||||
+ "],"
|
||||
+ "\"postAggregations\": ["
|
||||
+ " { \"type\": \"hyperUniqueCardinality\", \"name\": \"index_unique_count\", \"fieldName\": \"index_hll\" }"
|
||||
+ "],"
|
||||
+ "\"intervals\": [ \"1970/2050\" ]"
|
||||
+ "}";
|
||||
|
||||
Sequence seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(this.getClass().getClassLoader().getResource("druid.hll.sample.tsv").getFile()),
|
||||
parseSpec,
|
||||
metricSpec,
|
||||
0,
|
||||
QueryGranularities.DAY,
|
||||
50000,
|
||||
query
|
||||
);
|
||||
|
||||
MapBasedRow row = (MapBasedRow) Sequences.toList(seq, Lists.newArrayList()).get(0);
|
||||
Assert.assertEquals(4.0, row.getFloatMetric("index_hll"), 0.1);
|
||||
Assert.assertEquals(4.0, row.getFloatMetric("index_unique_count"), 0.1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,9 +27,9 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.druid.data.input.MapBasedInputRow;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.java.util.common.Pair;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
@ -94,7 +94,7 @@ public class SchemalessIndexTest
|
|||
|
||||
static {
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -21,7 +21,6 @@ package io.druid.segment;
|
|||
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.hash.Hashing;
|
||||
import com.google.common.io.CharSource;
|
||||
import com.google.common.io.LineProcessor;
|
||||
import com.google.common.io.Resources;
|
||||
|
@ -30,6 +29,7 @@ import io.druid.data.input.impl.DimensionsSpec;
|
|||
import io.druid.data.input.impl.StringInputRowParser;
|
||||
import io.druid.data.input.impl.TimestampSpec;
|
||||
import io.druid.granularity.QueryGranularities;
|
||||
import io.druid.hll.HyperLogLogHash;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.DoubleMaxAggregatorFactory;
|
||||
|
@ -104,7 +104,7 @@ public class TestIndex
|
|||
|
||||
static {
|
||||
if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(Hashing.murmur3_128()));
|
||||
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde(HyperLogLogHash.getDefault()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
2011-01-12T00:00:00.000Z spot AQAAAgAAAAAcAQFhIA==
|
||||
2011-01-12T00:00:00.000Z total_market AQAAAgAAAAFhIAGIBA==
|
||||
2011-01-12T00:00:00.000Z etc AQAAAgAAAAFhIAKWBA==
|
|
Loading…
Reference in New Issue