diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java index 80f286b4e39..4b28e268a79 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java @@ -25,17 +25,20 @@ import com.yahoo.sketches.Util; import com.yahoo.sketches.quantiles.DoublesSketch; import com.yahoo.sketches.quantiles.DoublesUnion; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregateCombiner; import org.apache.druid.query.aggregation.Aggregator; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; import org.apache.druid.query.aggregation.AggregatorUtil; import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.ObjectAggregateCombiner; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.NilColumnValueSelector; import org.apache.druid.segment.column.ValueType; +import javax.annotation.Nullable; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -137,6 +140,42 @@ public class DoublesSketchAggregatorFactory extends AggregatorFactory return union.getResultAndReset(); } + @Override + public AggregateCombiner makeAggregateCombiner() + { + return new ObjectAggregateCombiner() + { + private final DoublesUnion union = DoublesUnion.builder().setMaxK(k).build(); + + @Override + public void reset(final ColumnValueSelector selector) + { + union.reset(); + fold(selector); + } + + @Override + public void fold(final ColumnValueSelector selector) + { + final DoublesSketch sketch = (DoublesSketch) selector.getObject(); + union.update(sketch); + } + + @Nullable + @Override + public DoublesSketch getObject() + { + return union.getResult(); + } + + @Override + public Class classOfObject() + { + return DoublesSketch.class; + } + }; + } + @Override @JsonProperty public String getName()