diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java index 58f5b2eb9b4..409cc757006 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java @@ -32,15 +32,18 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import io.druid.java.util.common.IAE; +import io.druid.query.aggregation.AggregateCombiner; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorUtil; import io.druid.query.aggregation.BufferAggregator; +import io.druid.query.aggregation.ObjectAggregateCombiner; import io.druid.query.cache.CacheKeyBuilder; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.segment.BaseDoubleColumnValueSelector; import io.druid.segment.BaseObjectColumnValueSelector; import io.druid.segment.ColumnSelectorFactory; +import io.druid.segment.ColumnValueSelector; import io.druid.segment.DimensionSelector; import io.druid.segment.DimensionSelectorUtils; import io.druid.segment.NilColumnValueSelector; @@ -173,6 +176,42 @@ public class ArrayOfDoublesSketchAggregatorFactory extends AggregatorFactory return union.getResult(); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new ObjectAggregateCombiner() + { + private final ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries) + .setNumberOfValues(numberOfValues).buildUnion(); + + @Override + public void reset(final ColumnValueSelector selector) + { + union.reset(); + fold(selector); + } + + @Override + public void fold(final ColumnValueSelector selector) + { + final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) selector.getObject(); + union.update(sketch); + } + + @Override + public ArrayOfDoublesSketch getObject() + { + return union.getResult(); + } + + @Override + public Class classOfObject() + { + return ArrayOfDoublesSketch.class; + } + }; + } + @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java new file mode 100644 index 00000000000..d11d8f0cd61 --- /dev/null +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.query.aggregation.datasketches.tuple; + +import org.junit.Assert; +import org.junit.Test; + +import com.yahoo.sketches.tuple.ArrayOfDoublesSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch; +import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder; + +import io.druid.query.aggregation.AggregateCombiner; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.TestObjectColumnSelector; + +public class ArrayOfDoublesSketchAggregatorFactoryTest +{ + + @Test + public void makeAggregateCombiner() + { + AggregatorFactory aggregatorFactory = new ArrayOfDoublesSketchAggregatorFactory("", "", null, null, null); + AggregatorFactory combiningFactory = aggregatorFactory.getCombiningFactory(); + AggregateCombiner combiner = combiningFactory.makeAggregateCombiner(); + + ArrayOfDoublesUpdatableSketch sketch1 = new ArrayOfDoublesUpdatableSketchBuilder().build(); + sketch1.update("a", new double[] {1}); + + ArrayOfDoublesUpdatableSketch sketch2 = new ArrayOfDoublesUpdatableSketchBuilder().build(); + sketch2.update("b", new double[] {1}); + sketch2.update("c", new double[] {1}); + + TestObjectColumnSelector selector = new TestObjectColumnSelector(new ArrayOfDoublesSketch[] {sketch1, sketch2}); + + combiner.reset(selector); + Assert.assertEquals(1, combiner.getObject().getEstimate(), 0); + + selector.increment(); + combiner.fold(selector); + Assert.assertEquals(3, combiner.getObject().getEstimate(), 0); + } + +}