mirror of https://github.com/apache/druid.git
Add "round" option to cardinality and hyperUnique aggregators. (#4720)
* Add "round" option to cardinality and hyperUnique aggregators. Also turn it on by default in SQL, to make math on distinct counts work more as expected. * Fix some compile errors. * Fix test. * Formatting.
This commit is contained in:
parent
9fbfc1be32
commit
daf3c5f927
|
@ -227,12 +227,17 @@ instead of the cardinality aggregator if you do not care about the individual va
|
|||
"type": "cardinality",
|
||||
"name": "<output_name>",
|
||||
"fields": [ <dimension1>, <dimension2>, ... ],
|
||||
"byRow": <false | true> # (optional, defaults to false)
|
||||
"byRow": <false | true> # (optional, defaults to false),
|
||||
"round": <false | true> # (optional, defaults to false)
|
||||
}
|
||||
```
|
||||
|
||||
Each individual element of the "fields" list can be a String or [DimensionSpec](../querying/dimensionspecs.html). A String dimension in the fields list is equivalent to a DefaultDimensionSpec (no transformations).
|
||||
|
||||
The HyperLogLog algorithm generates decimal estimates with some error. "round" can be set to true to round off estimated
|
||||
values to whole numbers. Note that even with rounding, the cardinality is still an estimate. The "round" field only
|
||||
affects query-time behavior, and is ignored at ingestion-time.
|
||||
|
||||
#### Cardinality by value
|
||||
|
||||
When setting `byRow` to `false` (the default) it computes the cardinality of the set composed of the union of all dimension values for all the given dimensions.
|
||||
|
@ -315,12 +320,17 @@ Uses [HyperLogLog](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf) to
|
|||
"type" : "hyperUnique",
|
||||
"name" : <output_name>,
|
||||
"fieldName" : <metric_name>,
|
||||
"isInputHyperUnique" : false
|
||||
"isInputHyperUnique" : false,
|
||||
"round" : false
|
||||
}
|
||||
```
|
||||
|
||||
isInputHyperUnique can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected).
|
||||
The isInputHyperUnique field only affects ingestion-time behavior, and is ignored at query time.
|
||||
"isInputHyperUnique" can be set to true to index pre-computed HLL (Base64 encoded output from druid-hll is expected).
|
||||
The "isInputHyperUnique" field only affects ingestion-time behavior, and is ignored at query-time.
|
||||
|
||||
The HyperLogLog algorithm generates decimal estimates with some error. "round" can be set to true to round off estimated
|
||||
values to whole numbers. Note that even with rounding, the cardinality is still an estimate. The "round" field only
|
||||
affects query-time behavior, and is ignored at ingestion-time.
|
||||
|
||||
For more approximate aggregators, please see [theta sketches](../development/extensions-core/datasketches-aggregators.html).
|
||||
|
||||
|
|
|
@ -107,7 +107,11 @@ JavaScript-based functionality is disabled by default. Please refer to the Druid
|
|||
The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations.
|
||||
|
||||
```json
|
||||
{ "type" : "hyperUniqueCardinality", "name": <output name>, "fieldName" : <the name field value of the hyperUnique aggregator>}
|
||||
{
|
||||
"type" : "hyperUniqueCardinality",
|
||||
"name": <output name>,
|
||||
"fieldName" : <the name field value of the hyperUnique aggregator>
|
||||
}
|
||||
```
|
||||
|
||||
It can be used in a sample calculation as so:
|
||||
|
@ -128,6 +132,10 @@ It can be used in a sample calculation as so:
|
|||
}]
|
||||
```
|
||||
|
||||
This post-aggregator will inherit the rounding behavior of the aggregator it references. Note that this inheritance
|
||||
is only effective if you directly reference an aggregator. Going through another post-aggregator, for example, will
|
||||
cause the user-specified rounding behavior to get lost and default to "no rounding".
|
||||
|
||||
## Example Usage
|
||||
|
||||
In this example, let’s calculate a simple percentage using post aggregators. Let’s imagine our data set has a metric called "total".
|
||||
|
|
|
@ -502,6 +502,11 @@ public abstract class HyperLogLogCollector implements Comparable<HyperLogLogColl
|
|||
return theBytes;
|
||||
}
|
||||
|
||||
public long estimateCardinalityRound()
|
||||
{
|
||||
return Math.round(estimateCardinality());
|
||||
}
|
||||
|
||||
public double estimateCardinality()
|
||||
{
|
||||
if (estimatedCardinality == null) {
|
||||
|
|
|
@ -344,7 +344,7 @@ public class DetermineHashedPartitionsJob implements Jobby
|
|||
}
|
||||
).writeValue(
|
||||
out,
|
||||
new Double(aggregate.estimateCardinality()).longValue()
|
||||
aggregate.estimateCardinalityRound()
|
||||
);
|
||||
}
|
||||
finally {
|
||||
|
|
|
@ -425,7 +425,7 @@ public class BatchDeltaIngestionTest
|
|||
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
|
||||
Assert.assertEquals(
|
||||
(Double) expected.get("unique_hosts"),
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false),
|
||||
0.001
|
||||
);
|
||||
}
|
||||
|
|
|
@ -187,7 +187,14 @@ public class IndexGeneratorCombinerTest
|
|||
Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host"));
|
||||
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords"));
|
||||
Assert.assertEquals(15, capturedRow.getLongMetric("visited_sum"));
|
||||
Assert.assertEquals(2.0, (Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow.getRaw("unique_hosts")), 0.001);
|
||||
Assert.assertEquals(
|
||||
2.0,
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(
|
||||
capturedRow.getRaw("unique_hosts"),
|
||||
false
|
||||
),
|
||||
0.001
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -250,13 +257,21 @@ public class IndexGeneratorCombinerTest
|
|||
Assert.assertEquals(Collections.singletonList("host1"), capturedRow1.getDimension("host"));
|
||||
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords"));
|
||||
Assert.assertEquals(10, capturedRow1.getLongMetric("visited_sum"));
|
||||
Assert.assertEquals(1.0, (Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts")), 0.001);
|
||||
Assert.assertEquals(
|
||||
1.0,
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow1.getRaw("unique_hosts"), false),
|
||||
0.001
|
||||
);
|
||||
|
||||
InputRow capturedRow2 = InputRowSerde.fromBytes(captureVal2.getValue().getBytes(), aggregators);
|
||||
Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions());
|
||||
Assert.assertEquals(Collections.singletonList("host2"), capturedRow2.getDimension("host"));
|
||||
Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords"));
|
||||
Assert.assertEquals(5, capturedRow2.getLongMetric("visited_sum"));
|
||||
Assert.assertEquals(1.0, (Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts")), 0.001);
|
||||
Assert.assertEquals(
|
||||
1.0,
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(capturedRow2.getRaw("unique_hosts"), false),
|
||||
0.001
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ public class DatasourceRecordReaderTest
|
|||
Assert.assertEquals(expected.get("visited_sum"), actual.getLongMetric("visited_sum"));
|
||||
Assert.assertEquals(
|
||||
(Double) expected.get("unique_hosts"),
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts")),
|
||||
(Double) HyperUniquesAggregatorFactory.estimateCardinality(actual.getRaw("unique_hosts"), false),
|
||||
0.001
|
||||
);
|
||||
}
|
||||
|
|
|
@ -377,7 +377,7 @@ public class IndexTask extends AbstractTask
|
|||
|
||||
final int numShards;
|
||||
if (determineNumPartitions) {
|
||||
final long numRows = new Double(collector.estimateCardinality()).longValue();
|
||||
final long numRows = collector.estimateCardinalityRound();
|
||||
numShards = (int) Math.ceil((double) numRows / tuningConfig.getTargetPartitionSize());
|
||||
log.info("Estimated [%,d] rows of data for interval [%s], creating [%,d] shards", numRows, interval, numShards);
|
||||
} else {
|
||||
|
|
|
@ -39,6 +39,7 @@ import io.druid.query.aggregation.NoopBufferAggregator;
|
|||
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
|
||||
import io.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
|
||||
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import io.druid.query.dimension.DimensionSpec;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
|
@ -46,10 +47,11 @@ import io.druid.segment.DimensionHandlerUtils;
|
|||
import org.apache.commons.codec.binary.Base64;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class CardinalityAggregatorFactory extends AggregatorFactory
|
||||
{
|
||||
|
@ -87,28 +89,21 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
);
|
||||
}
|
||||
|
||||
public static Object estimateCardinality(Object object)
|
||||
{
|
||||
if (object == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ((HyperLogLogCollector) object).estimateCardinality();
|
||||
}
|
||||
|
||||
private static final CardinalityAggregatorColumnSelectorStrategyFactory STRATEGY_FACTORY =
|
||||
new CardinalityAggregatorColumnSelectorStrategyFactory();
|
||||
|
||||
private final String name;
|
||||
private final List<DimensionSpec> fields;
|
||||
private final boolean byRow;
|
||||
private final boolean round;
|
||||
|
||||
@JsonCreator
|
||||
public CardinalityAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@Deprecated @JsonProperty("fieldNames") final List<String> fieldNames,
|
||||
@JsonProperty("fields") final List<DimensionSpec> fields,
|
||||
@JsonProperty("byRow") final boolean byRow
|
||||
@JsonProperty("byRow") final boolean byRow,
|
||||
@JsonProperty("round") final boolean round
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
|
@ -123,6 +118,7 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
this.fields = fields;
|
||||
}
|
||||
this.byRow = byRow;
|
||||
this.round = round;
|
||||
}
|
||||
|
||||
public CardinalityAggregatorFactory(
|
||||
|
@ -131,7 +127,7 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
final boolean byRow
|
||||
)
|
||||
{
|
||||
this(name, null, fields, byRow);
|
||||
this(name, null, fields, byRow, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -201,7 +197,7 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new HyperUniquesAggregatorFactory(name, name);
|
||||
return new HyperUniquesAggregatorFactory(name, name, false, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -213,17 +209,18 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public List<AggregatorFactory> getRequiredColumns()
|
||||
{
|
||||
return Lists.transform(
|
||||
fields,
|
||||
new Function<DimensionSpec, AggregatorFactory>()
|
||||
{
|
||||
@Override
|
||||
public AggregatorFactory apply(DimensionSpec input)
|
||||
{
|
||||
return new CardinalityAggregatorFactory(input.getOutputName(), Collections.singletonList(input), byRow);
|
||||
}
|
||||
}
|
||||
);
|
||||
return fields.stream()
|
||||
.map(
|
||||
field ->
|
||||
new CardinalityAggregatorFactory(
|
||||
field.getOutputName(),
|
||||
null,
|
||||
Collections.singletonList(field),
|
||||
byRow,
|
||||
round
|
||||
)
|
||||
)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,7 +246,7 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return estimateCardinality(object);
|
||||
return HyperUniquesAggregatorFactory.estimateCardinality(object, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -277,25 +274,20 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
return byRow;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRound()
|
||||
{
|
||||
return round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
List<byte[]> dimSpecKeys = new ArrayList<>();
|
||||
int dimSpecKeysLength = fields.size();
|
||||
for (DimensionSpec dimSpec : fields) {
|
||||
byte[] dimSpecKey = dimSpec.getCacheKey();
|
||||
dimSpecKeysLength += dimSpecKey.length;
|
||||
dimSpecKeys.add(dimSpec.getCacheKey());
|
||||
}
|
||||
|
||||
ByteBuffer retBuf = ByteBuffer.allocate(2 + dimSpecKeysLength);
|
||||
retBuf.put(AggregatorUtil.CARD_CACHE_TYPE_ID);
|
||||
for (byte[] dimSpecKey : dimSpecKeys) {
|
||||
retBuf.put(dimSpecKey);
|
||||
retBuf.put(AggregatorUtil.STRING_SEPARATOR);
|
||||
}
|
||||
retBuf.put((byte) (byRow ? 1 : 0));
|
||||
return retBuf.array();
|
||||
return new CacheKeyBuilder(AggregatorUtil.CARD_CACHE_TYPE_ID)
|
||||
.appendCacheables(fields)
|
||||
.appendBoolean(byRow)
|
||||
.appendBoolean(round)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -311,7 +303,7 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
public boolean equals(final Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
|
@ -319,26 +311,17 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;
|
||||
|
||||
if (isByRow() != that.isByRow()) {
|
||||
return false;
|
||||
}
|
||||
if (!getName().equals(that.getName())) {
|
||||
return false;
|
||||
}
|
||||
return getFields().equals(that.getFields());
|
||||
|
||||
final CardinalityAggregatorFactory that = (CardinalityAggregatorFactory) o;
|
||||
return byRow == that.byRow &&
|
||||
round == that.round &&
|
||||
Objects.equals(name, that.name) &&
|
||||
Objects.equals(fields, that.fields);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
int result = getName().hashCode();
|
||||
result = 31 * result + getFields().hashCode();
|
||||
result = 31 * result + (isByRow() ? 1 : 0);
|
||||
return result;
|
||||
return Objects.hash(name, fields, byRow, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -346,7 +329,9 @@ public class CardinalityAggregatorFactory extends AggregatorFactory
|
|||
{
|
||||
return "CardinalityAggregatorFactory{" +
|
||||
"name='" + name + '\'' +
|
||||
", fields='" + fields + '\'' +
|
||||
", fields=" + fields +
|
||||
", byRow=" + byRow +
|
||||
", round=" + round +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,18 +48,28 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
|
|||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
private final AggregatorFactory aggregatorFactory;
|
||||
|
||||
@JsonCreator
|
||||
public HyperUniqueFinalizingPostAggregator(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") String fieldName
|
||||
)
|
||||
{
|
||||
this(name, fieldName, null);
|
||||
}
|
||||
|
||||
private HyperUniqueFinalizingPostAggregator(
|
||||
String name,
|
||||
String fieldName,
|
||||
AggregatorFactory aggregatorFactory
|
||||
)
|
||||
{
|
||||
this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null");
|
||||
//Note that, in general, name shouldn't be null, we are defaulting
|
||||
//to fieldName here just to be backward compatible with 0.7.x
|
||||
this.name = name == null ? fieldName : name;
|
||||
|
||||
this.aggregatorFactory = aggregatorFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,7 +87,16 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
|
|||
@Override
|
||||
public Object compute(Map<String, Object> combinedAggregators)
|
||||
{
|
||||
return HyperUniquesAggregatorFactory.estimateCardinality(combinedAggregators.get(fieldName));
|
||||
final Object collector = combinedAggregators.get(fieldName);
|
||||
|
||||
if (aggregatorFactory == null) {
|
||||
// This didn't come directly from an aggregator. Maybe it came through a FieldAccessPostAggregator or
|
||||
// something like that. Hope it's a HyperLogLogCollector, and estimate it without rounding.
|
||||
return HyperUniquesAggregatorFactory.estimateCardinality(collector, false);
|
||||
} else {
|
||||
// Delegate to the aggregator factory to get the user-specified rounding behavior.
|
||||
return aggregatorFactory.finalizeComputation(collector);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,7 +109,8 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator
|
|||
@Override
|
||||
public HyperUniqueFinalizingPostAggregator decorate(Map<String, AggregatorFactory> aggregators)
|
||||
{
|
||||
return this;
|
||||
final AggregatorFactory theAggregatorFactory = aggregators != null ? aggregators.get(fieldName) : null;
|
||||
return new HyperUniqueFinalizingPostAggregator(name, fieldName, theAggregatorFactory);
|
||||
}
|
||||
|
||||
@JsonProperty("fieldName")
|
||||
|
|
|
@ -34,6 +34,7 @@ import io.druid.query.aggregation.BufferAggregator;
|
|||
import io.druid.query.aggregation.NoopAggregator;
|
||||
import io.druid.query.aggregation.NoopBufferAggregator;
|
||||
import io.druid.query.aggregation.cardinality.HyperLogLogCollectorAggregateCombiner;
|
||||
import io.druid.query.cache.CacheKeyBuilder;
|
||||
import io.druid.segment.ColumnSelectorFactory;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
|
@ -49,29 +50,39 @@ import java.util.Objects;
|
|||
*/
|
||||
public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
||||
{
|
||||
public static Object estimateCardinality(Object object)
|
||||
public static Object estimateCardinality(Object object, boolean round)
|
||||
{
|
||||
if (object == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ((HyperLogLogCollector) object).estimateCardinality();
|
||||
final HyperLogLogCollector collector = (HyperLogLogCollector) object;
|
||||
|
||||
// Avoid ternary, it causes estimateCardinalityRound to be cast to double.
|
||||
if (round) {
|
||||
return collector.estimateCardinalityRound();
|
||||
} else {
|
||||
return collector.estimateCardinality();
|
||||
}
|
||||
}
|
||||
|
||||
private final String name;
|
||||
private final String fieldName;
|
||||
private final boolean isInputHyperUnique;
|
||||
private final boolean round;
|
||||
|
||||
@JsonCreator
|
||||
public HyperUniquesAggregatorFactory(
|
||||
@JsonProperty("name") String name,
|
||||
@JsonProperty("fieldName") String fieldName,
|
||||
@JsonProperty("isInputHyperUnique") Boolean isInputHyperUnique
|
||||
@JsonProperty("isInputHyperUnique") boolean isInputHyperUnique,
|
||||
@JsonProperty("round") boolean round
|
||||
)
|
||||
{
|
||||
this.name = name;
|
||||
this.fieldName = fieldName;
|
||||
this.isInputHyperUnique = (isInputHyperUnique == null) ? false : isInputHyperUnique;
|
||||
this.isInputHyperUnique = isInputHyperUnique;
|
||||
this.round = round;
|
||||
}
|
||||
|
||||
public HyperUniquesAggregatorFactory(
|
||||
|
@ -79,7 +90,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
String fieldName
|
||||
)
|
||||
{
|
||||
this(name, fieldName, false);
|
||||
this(name, fieldName, false, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -147,7 +158,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public AggregatorFactory getCombiningFactory()
|
||||
{
|
||||
return new HyperUniquesAggregatorFactory(name, name, false);
|
||||
return new HyperUniquesAggregatorFactory(name, name, false, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -166,7 +177,8 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
return Arrays.<AggregatorFactory>asList(new HyperUniquesAggregatorFactory(
|
||||
fieldName,
|
||||
fieldName,
|
||||
isInputHyperUnique
|
||||
isInputHyperUnique,
|
||||
round
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -192,7 +204,7 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
@Override
|
||||
public Object finalizeComputation(Object object)
|
||||
{
|
||||
return estimateCardinality(object);
|
||||
return estimateCardinality(object, round);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -220,15 +232,19 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
return isInputHyperUnique;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public boolean isRound()
|
||||
{
|
||||
return round;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getCacheKey()
|
||||
{
|
||||
byte[] fieldNameBytes = StringUtils.toUtf8(fieldName);
|
||||
|
||||
return ByteBuffer.allocate(1 + fieldNameBytes.length)
|
||||
.put(AggregatorUtil.HYPER_UNIQUE_CACHE_TYPE_ID)
|
||||
.put(fieldNameBytes)
|
||||
.array();
|
||||
return new CacheKeyBuilder(AggregatorUtil.HYPER_UNIQUE_CACHE_TYPE_ID)
|
||||
.appendString(fieldName)
|
||||
.appendBoolean(round)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -254,11 +270,12 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
"name='" + name + '\'' +
|
||||
", fieldName='" + fieldName + '\'' +
|
||||
", isInputHyperUnique=" + isInputHyperUnique +
|
||||
", round=" + round +
|
||||
'}';
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
public boolean equals(final Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
|
@ -266,16 +283,16 @@ public class HyperUniquesAggregatorFactory extends AggregatorFactory
|
|||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o;
|
||||
|
||||
return Objects.equals(fieldName, that.fieldName) && Objects.equals(name, that.name) &&
|
||||
Objects.equals(isInputHyperUnique, that.isInputHyperUnique);
|
||||
final HyperUniquesAggregatorFactory that = (HyperUniquesAggregatorFactory) o;
|
||||
return isInputHyperUnique == that.isInputHyperUnique &&
|
||||
round == that.round &&
|
||||
Objects.equals(name, that.name) &&
|
||||
Objects.equals(fieldName, that.fieldName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(name, fieldName, isInputHyperUnique);
|
||||
return Objects.hash(name, fieldName, isInputHyperUnique, round);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -158,6 +158,12 @@ public class QueryRunnerTestHelper
|
|||
"uniques",
|
||||
"quality_uniques"
|
||||
);
|
||||
public static final HyperUniquesAggregatorFactory qualityUniquesRounded = new HyperUniquesAggregatorFactory(
|
||||
"uniques",
|
||||
"quality_uniques",
|
||||
false,
|
||||
true
|
||||
);
|
||||
public static final CardinalityAggregatorFactory qualityCardinality = new CardinalityAggregatorFactory(
|
||||
"cardinality",
|
||||
Arrays.<DimensionSpec>asList(new DefaultDimensionSpec("quality", "quality")),
|
||||
|
|
|
@ -286,6 +286,7 @@ public class CardinalityAggregatorTest
|
|||
List<ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>> dimInfoList;
|
||||
List<DimensionSelector> selectorList;
|
||||
CardinalityAggregatorFactory rowAggregatorFactory;
|
||||
CardinalityAggregatorFactory rowAggregatorFactoryRounded;
|
||||
CardinalityAggregatorFactory valueAggregatorFactory;
|
||||
final TestDimensionSelector dim1;
|
||||
final TestDimensionSelector dim2;
|
||||
|
@ -335,6 +336,17 @@ public class CardinalityAggregatorTest
|
|||
true
|
||||
);
|
||||
|
||||
rowAggregatorFactoryRounded = new CardinalityAggregatorFactory(
|
||||
"billy",
|
||||
null,
|
||||
Lists.<DimensionSpec>newArrayList(
|
||||
dimSpec1,
|
||||
dimSpec2
|
||||
),
|
||||
true,
|
||||
true
|
||||
);
|
||||
|
||||
valueAggregatorFactory = new CardinalityAggregatorFactory(
|
||||
"billy",
|
||||
Lists.<DimensionSpec>newArrayList(
|
||||
|
@ -403,6 +415,7 @@ public class CardinalityAggregatorTest
|
|||
aggregate(selectorList, agg);
|
||||
}
|
||||
Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get()), 0.05);
|
||||
Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -418,6 +431,7 @@ public class CardinalityAggregatorTest
|
|||
aggregate(selectorList, agg);
|
||||
}
|
||||
Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get()), 0.05);
|
||||
Assert.assertEquals(7L, rowAggregatorFactoryRounded.finalizeComputation(agg.get()));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -439,6 +453,7 @@ public class CardinalityAggregatorTest
|
|||
bufferAggregate(selectorList, agg, buf, pos);
|
||||
}
|
||||
Assert.assertEquals(9.0, (Double) rowAggregatorFactory.finalizeComputation(agg.get(buf, pos)), 0.05);
|
||||
Assert.assertEquals(9L, rowAggregatorFactoryRounded.finalizeComputation(agg.get(buf, pos)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -460,6 +475,7 @@ public class CardinalityAggregatorTest
|
|||
bufferAggregate(selectorList, agg, buf, pos);
|
||||
}
|
||||
Assert.assertEquals(7.0, (Double) valueAggregatorFactory.finalizeComputation(agg.get(buf, pos)), 0.05);
|
||||
Assert.assertEquals(7L, rowAggregatorFactoryRounded.finalizeComputation(agg.get(buf, pos)));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -606,11 +622,13 @@ public class CardinalityAggregatorTest
|
|||
{
|
||||
CardinalityAggregatorFactory factory = new CardinalityAggregatorFactory(
|
||||
"billy",
|
||||
null,
|
||||
ImmutableList.<DimensionSpec>of(
|
||||
new DefaultDimensionSpec("b", "b"),
|
||||
new DefaultDimensionSpec("a", "a"),
|
||||
new DefaultDimensionSpec("c", "c")
|
||||
),
|
||||
true,
|
||||
true
|
||||
);
|
||||
ObjectMapper objectMapper = new DefaultObjectMapper();
|
||||
|
@ -619,7 +637,13 @@ public class CardinalityAggregatorTest
|
|||
objectMapper.readValue(objectMapper.writeValueAsString(factory), AggregatorFactory.class)
|
||||
);
|
||||
|
||||
String fieldNamesOnly = "{\"type\":\"cardinality\",\"name\":\"billy\",\"fields\":[\"b\",\"a\",\"c\"],\"byRow\":true}";
|
||||
String fieldNamesOnly = "{"
|
||||
+ "\"type\":\"cardinality\","
|
||||
+ "\"name\":\"billy\","
|
||||
+ "\"fields\":[\"b\",\"a\",\"c\"],"
|
||||
+ "\"byRow\":true,"
|
||||
+ "\"round\":true"
|
||||
+ "}";
|
||||
Assert.assertEquals(
|
||||
factory,
|
||||
objectMapper.readValue(fieldNamesOnly, AggregatorFactory.class)
|
||||
|
|
|
@ -23,9 +23,13 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.druid.hll.HyperLogLogCollector;
|
||||
import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory;
|
||||
import io.druid.query.dimension.DefaultDimensionSpec;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Random;
|
||||
|
||||
/**
|
||||
|
@ -52,4 +56,35 @@ public class HyperUniqueFinalizingPostAggregatorTest
|
|||
|
||||
Assert.assertTrue(cardinality == 99.37233005831612);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComputeRounded() throws Exception
|
||||
{
|
||||
Random random = new Random(0L);
|
||||
HyperUniqueFinalizingPostAggregator postAggregator = new HyperUniqueFinalizingPostAggregator(
|
||||
"uniques", "uniques"
|
||||
).decorate(
|
||||
ImmutableMap.of(
|
||||
"uniques",
|
||||
new CardinalityAggregatorFactory(
|
||||
"uniques",
|
||||
null,
|
||||
Collections.singletonList(DefaultDimensionSpec.of("dummy")),
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
);
|
||||
HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector();
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
byte[] hashedVal = fn.hashLong(random.nextLong()).asBytes();
|
||||
collector.add(hashedVal);
|
||||
}
|
||||
|
||||
Object cardinality = postAggregator.compute(ImmutableMap.<String, Object>of("uniques", collector));
|
||||
|
||||
Assert.assertThat(cardinality, CoreMatchers.instanceOf(Long.class));
|
||||
Assert.assertEquals(99L, cardinality);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
|
||||
package io.druid.query.aggregation.hyperloglog;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.hash.HashFunction;
|
||||
import com.google.common.hash.Hashing;
|
||||
import io.druid.hll.HLLCV0;
|
||||
import io.druid.hll.HyperLogLogCollector;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -83,8 +86,8 @@ public class HyperUniquesAggregatorFactoryTest
|
|||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -102,8 +105,8 @@ public class HyperUniquesAggregatorFactoryTest
|
|||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -121,8 +124,8 @@ public class HyperUniquesAggregatorFactoryTest
|
|||
}
|
||||
|
||||
Assert.assertEquals(
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
Double.compare(collector1.estimateCardinality(), collector2.estimateCardinality()),
|
||||
comparator.compare(collector1, collector2)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -150,20 +153,42 @@ public class HyperUniquesAggregatorFactoryTest
|
|||
}
|
||||
|
||||
// when
|
||||
final int orderedByCardinality = Double.compare(leftCollector.estimateCardinality(),
|
||||
rightCollector.estimateCardinality());
|
||||
final int orderedByCardinality = Double.compare(
|
||||
leftCollector.estimateCardinality(),
|
||||
rightCollector.estimateCardinality()
|
||||
);
|
||||
final int orderedByComparator = comparator.compare(leftCollector, rightCollector);
|
||||
|
||||
// then, assert hyperloglog comparator behaves consistently with estimated cardinalities
|
||||
Assert.assertEquals(
|
||||
StringUtils.format("orderedByComparator=%d, orderedByCardinality=%d,\n" +
|
||||
"Left={cardinality=%f, hll=%s},\n" +
|
||||
"Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality,
|
||||
leftCollector.estimateCardinality(), leftCollector,
|
||||
rightCollector.estimateCardinality(), rightCollector),
|
||||
orderedByCardinality,
|
||||
orderedByComparator
|
||||
StringUtils.format("orderedByComparator=%d, orderedByCardinality=%d,\n" +
|
||||
"Left={cardinality=%f, hll=%s},\n" +
|
||||
"Right={cardinality=%f, hll=%s},\n", orderedByComparator, orderedByCardinality,
|
||||
leftCollector.estimateCardinality(), leftCollector,
|
||||
rightCollector.estimateCardinality(), rightCollector
|
||||
),
|
||||
orderedByCardinality,
|
||||
orderedByComparator
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
final HyperUniquesAggregatorFactory factory = new HyperUniquesAggregatorFactory(
|
||||
"foo",
|
||||
"bar",
|
||||
true,
|
||||
true
|
||||
);
|
||||
|
||||
final ObjectMapper jsonMapper = TestHelper.getJsonMapper();
|
||||
final AggregatorFactory factory2 = jsonMapper.readValue(
|
||||
jsonMapper.writeValueAsString(factory),
|
||||
AggregatorFactory.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(factory, factory2);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -768,6 +768,56 @@ public class TopNQueryRunnerTest
|
|||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNOverHyperUniqueExpressionRounded()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.dataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(QueryRunnerTestHelper.marketDimension)
|
||||
.metric(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric)
|
||||
.threshold(3)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(QueryRunnerTestHelper.qualityUniquesRounded)
|
||||
)
|
||||
.postAggregators(
|
||||
Collections.singletonList(new ExpressionPostAggregator(
|
||||
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
|
||||
"uniques + 1",
|
||||
null,
|
||||
TestExprMacroTable.INSTANCE
|
||||
))
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<>(
|
||||
DateTimes.of("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("market", "spot")
|
||||
.put(QueryRunnerTestHelper.uniqueMetric, 9L)
|
||||
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, 10L)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("market", "total_market")
|
||||
.put(QueryRunnerTestHelper.uniqueMetric, 2L)
|
||||
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, 3L)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put("market", "upfront")
|
||||
.put(QueryRunnerTestHelper.uniqueMetric, 2L)
|
||||
.put(QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric, 3L)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
assertExpectedResults(expectedResults, query);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNOverFirstLastAggregator()
|
||||
{
|
||||
|
|
|
@ -91,7 +91,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
final AggregatorFactory aggregatorFactory;
|
||||
|
||||
if (input.isDirectColumnAccess() && rowSignature.getColumnType(input.getDirectColumn()) == ValueType.COMPLEX) {
|
||||
aggregatorFactory = new HyperUniquesAggregatorFactory(name, input.getDirectColumn());
|
||||
aggregatorFactory = new HyperUniquesAggregatorFactory(name, input.getDirectColumn(), false, true);
|
||||
} else {
|
||||
final SqlTypeName sqlTypeName = rexNode.getType().getSqlTypeName();
|
||||
final ValueType inputType = Calcites.getValueTypeForSqlTypeName(sqlTypeName);
|
||||
|
@ -113,7 +113,7 @@ public class ApproxCountDistinctSqlAggregator implements SqlAggregator
|
|||
virtualColumns.add(virtualColumn);
|
||||
}
|
||||
|
||||
aggregatorFactory = new CardinalityAggregatorFactory(name, ImmutableList.of(dimensionSpec), false);
|
||||
aggregatorFactory = new CardinalityAggregatorFactory(name, null, ImmutableList.of(dimensionSpec), false, true);
|
||||
}
|
||||
|
||||
return Aggregation.create(virtualColumns, aggregatorFactory).filter(filter);
|
||||
|
|
|
@ -2979,9 +2979,10 @@ public class CalciteQueryTest
|
|||
"a1",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("dim2", null)),
|
||||
false
|
||||
false,
|
||||
true
|
||||
),
|
||||
new HyperUniquesAggregatorFactory("a2", "unique_dim1")
|
||||
new HyperUniquesAggregatorFactory("a2", "unique_dim1", false, true)
|
||||
)
|
||||
)
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
|
@ -3050,7 +3051,8 @@ public class CalciteQueryTest
|
|||
"a0",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("dim2", null)),
|
||||
false
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -3136,19 +3138,22 @@ public class CalciteQueryTest
|
|||
"a1",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("dim2", "dim2")),
|
||||
false
|
||||
false,
|
||||
true
|
||||
),
|
||||
new FilteredAggregatorFactory(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a2",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("dim2", "dim2")),
|
||||
false
|
||||
false,
|
||||
true
|
||||
),
|
||||
NOT(SELECTOR("dim2", "", null))
|
||||
),
|
||||
new CardinalityAggregatorFactory(
|
||||
"a3",
|
||||
null,
|
||||
DIMS(
|
||||
new ExtractionDimensionSpec(
|
||||
"dim2",
|
||||
|
@ -3157,14 +3162,17 @@ public class CalciteQueryTest
|
|||
new SubstringDimExtractionFn(0, 1)
|
||||
)
|
||||
),
|
||||
false
|
||||
false,
|
||||
true
|
||||
),
|
||||
new CardinalityAggregatorFactory(
|
||||
"a4",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("a4:v", "a4:v", ValueType.STRING)),
|
||||
false
|
||||
false,
|
||||
true
|
||||
),
|
||||
new HyperUniquesAggregatorFactory("a5", "unique_dim1")
|
||||
new HyperUniquesAggregatorFactory("a5", "unique_dim1", false, true)
|
||||
)
|
||||
)
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
|
@ -3549,8 +3557,10 @@ public class CalciteQueryTest
|
|||
new CountAggregatorFactory("a0"),
|
||||
new CardinalityAggregatorFactory(
|
||||
"a1",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("d0", null)),
|
||||
false
|
||||
false,
|
||||
true
|
||||
)
|
||||
))
|
||||
.setPostAggregatorSpecs(
|
||||
|
@ -3562,7 +3572,7 @@ public class CalciteQueryTest
|
|||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{5L, 5L, -0.12226936f}
|
||||
new Object[]{5L, 5L, 0.0f}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -3683,7 +3693,8 @@ public class CalciteQueryTest
|
|||
"a1",
|
||||
null,
|
||||
DIMS(new DefaultDimensionSpec("dim2", null)),
|
||||
false
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -3697,7 +3708,7 @@ public class CalciteQueryTest
|
|||
.build()
|
||||
),
|
||||
ImmutableList.of(
|
||||
new Object[]{6L, 3L, 3.0021994f, 1L, 4L, 4.9985347f}
|
||||
new Object[]{6L, 3L, 3.0f, 2L, 5L, 5.0f}
|
||||
)
|
||||
);
|
||||
}
|
||||
|
@ -3717,6 +3728,7 @@ public class CalciteQueryTest
|
|||
AGGS(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a0",
|
||||
null,
|
||||
DIMS(
|
||||
new ExtractionDimensionSpec(
|
||||
"dim1",
|
||||
|
@ -3724,7 +3736,8 @@ public class CalciteQueryTest
|
|||
new SubstringDimExtractionFn(0, 1)
|
||||
)
|
||||
),
|
||||
false
|
||||
false,
|
||||
true
|
||||
)
|
||||
)
|
||||
)
|
||||
|
@ -4408,8 +4421,10 @@ public class CalciteQueryTest
|
|||
.aggregators(AGGS(
|
||||
new CardinalityAggregatorFactory(
|
||||
"a0",
|
||||
null,
|
||||
ImmutableList.<DimensionSpec>of(new ExtractionDimensionSpec("dim1", null, extractionFn)),
|
||||
false
|
||||
false,
|
||||
true
|
||||
)
|
||||
))
|
||||
.context(TIMESERIES_CONTEXT_DEFAULT)
|
||||
|
|
Loading…
Reference in New Issue