diff --git a/extensions-core/datasketches/pom.xml b/extensions-core/datasketches/pom.xml
index 54e9ceaa5f8..30413806db8 100644
--- a/extensions-core/datasketches/pom.xml
+++ b/extensions-core/datasketches/pom.xml
@@ -40,6 +40,10 @@
sketches-core0.10.3
+
+ org.apache.commons
+ commons-math3
+ io.druiddruid-api
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
new file mode 100644
index 00000000000..1caadd7190b
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+import io.druid.query.dimension.DefaultDimensionSpec;
+import io.druid.segment.BaseDoubleColumnValueSelector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+import io.druid.segment.ColumnSelectorFactory;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.DimensionSelectorUtils;
+import io.druid.segment.NilColumnValueSelector;
+
+import com.yahoo.sketches.Util;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder;
+
+public class ArrayOfDoublesSketchAggregatorFactory extends AggregatorFactory
+{
+
+ public static final Comparator COMPARATOR =
+ Comparator.nullsFirst(Comparator.comparingDouble(ArrayOfDoublesSketch::getEstimate));
+
+ private final String name;
+ private final String fieldName;
+ private final int nominalEntries;
+ private final int numberOfValues;
+ // if specified indicates building sketched from raw data, and also implies the number of values
+ @Nullable private final List metricColumns;
+
+ @JsonCreator
+ public ArrayOfDoublesSketchAggregatorFactory(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fieldName") final String fieldName,
+ @JsonProperty("nominalEntries") @Nullable final Integer nominalEntries,
+ @JsonProperty("metricColumns") @Nullable final List metricColumns,
+ @JsonProperty("numberOfValues") @Nullable final Integer numberOfValues
+ )
+ {
+ this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
+ this.fieldName = Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
+ this.nominalEntries = nominalEntries == null ? Util.DEFAULT_NOMINAL_ENTRIES : nominalEntries;
+ Util.checkIfPowerOf2(this.nominalEntries, "nominalEntries");
+ this.metricColumns = metricColumns;
+ this.numberOfValues = numberOfValues == null ? (metricColumns == null ? 1 : metricColumns.size()) : numberOfValues;
+ if (metricColumns != null && metricColumns.size() != this.numberOfValues) {
+ throw new IAE(
+ "Number of metricColumns [%d] must agree with numValues [%d]",
+ metricColumns.size(),
+ this.numberOfValues
+ );
+ }
+ }
+
+ @Override
+ public Aggregator factorize(final ColumnSelectorFactory metricFactory)
+ {
+ if (metricColumns == null) { // input is sketches, use merge aggregator
+ final BaseObjectColumnValueSelector selector = metricFactory
+ .makeColumnValueSelector(fieldName);
+ if (selector instanceof NilColumnValueSelector) {
+ return new ArrayOfDoublesSketchNoOpAggregator(numberOfValues);
+ }
+ return new ArrayOfDoublesSketchMergeAggregator(selector, nominalEntries, numberOfValues);
+ }
+ // input is raw data (key and array of values), use build aggregator
+ final DimensionSelector keySelector = metricFactory
+ .makeDimensionSelector(new DefaultDimensionSpec(fieldName, fieldName));
+ if (DimensionSelectorUtils.isNilSelector(keySelector)) {
+ return new ArrayOfDoublesSketchNoOpAggregator(numberOfValues);
+ }
+ final List valueSelectors = new ArrayList<>();
+ for (final String column : metricColumns) {
+ final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(column);
+ valueSelectors.add(valueSelector);
+ }
+ return new ArrayOfDoublesSketchBuildAggregator(keySelector, valueSelectors, nominalEntries);
+ }
+
+ @Override
+ public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory)
+ {
+ if (metricColumns == null) { // input is sketches, use merge aggregator
+ final BaseObjectColumnValueSelector selector = metricFactory
+ .makeColumnValueSelector(fieldName);
+ if (selector instanceof NilColumnValueSelector) {
+ return new ArrayOfDoublesSketchNoOpBufferAggregator(numberOfValues);
+ }
+ return new ArrayOfDoublesSketchMergeBufferAggregator(
+ selector,
+ nominalEntries,
+ numberOfValues,
+ getMaxIntermediateSize()
+ );
+ }
+ // input is raw data (key and array of values), use build aggregator
+ final DimensionSelector keySelector = metricFactory
+ .makeDimensionSelector(new DefaultDimensionSpec(fieldName, fieldName));
+ if (DimensionSelectorUtils.isNilSelector(keySelector)) {
+ return new ArrayOfDoublesSketchNoOpBufferAggregator(numberOfValues);
+ }
+ final List valueSelectors = new ArrayList<>();
+ for (final String column : metricColumns) {
+ final BaseDoubleColumnValueSelector valueSelector = metricFactory.makeColumnValueSelector(column);
+ valueSelectors.add(valueSelector);
+ }
+ return new ArrayOfDoublesSketchBuildBufferAggregator(
+ keySelector,
+ valueSelectors,
+ nominalEntries,
+ getMaxIntermediateSize()
+ );
+ }
+
+ @Override
+ public Object deserialize(final Object object)
+ {
+ return ArrayOfDoublesSketchOperations.deserialize(object);
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return COMPARATOR;
+ }
+
+ @Override
+ public Object combine(@Nullable final Object lhs, @Nullable final Object rhs)
+ {
+ final ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries)
+ .setNumberOfValues(numberOfValues).buildUnion();
+ if (lhs != null) {
+ union.update((ArrayOfDoublesSketch) lhs);
+ }
+ if (rhs != null) {
+ union.update((ArrayOfDoublesSketch) rhs);
+ }
+ return union.getResult();
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public String getFieldName()
+ {
+ return fieldName;
+ }
+
+ @JsonProperty
+ public int getNominalEntries()
+ {
+ return nominalEntries;
+ }
+
+ @JsonProperty
+ public List getMetricColumns()
+ {
+ return metricColumns;
+ }
+
+ @JsonProperty
+ public int getNumberOfValues()
+ {
+ return numberOfValues;
+ }
+
+ @Override
+ public List requiredFields()
+ {
+ return Collections.singletonList(fieldName);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ final CacheKeyBuilder builder = new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_CACHE_TYPE_ID)
+ .appendString(name)
+ .appendString(fieldName)
+ .appendInt(nominalEntries)
+ .appendInt(numberOfValues);
+ if (metricColumns != null) {
+ builder.appendStrings(metricColumns);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public int getMaxIntermediateSize()
+ {
+ return ArrayOfDoublesUnion.getMaxBytes(nominalEntries, numberOfValues);
+ }
+
+ @Override
+ public List getRequiredColumns()
+ {
+ return Collections.singletonList(
+ new ArrayOfDoublesSketchAggregatorFactory(
+ fieldName,
+ fieldName,
+ nominalEntries,
+ metricColumns,
+ numberOfValues
+ )
+ );
+ }
+
+ @Override
+ public AggregatorFactory getCombiningFactory()
+ {
+ return new ArrayOfDoublesSketchAggregatorFactory(name, name, nominalEntries, null, numberOfValues);
+ }
+
+ @Override
+ public Object finalizeComputation(final Object object)
+ {
+ return ((ArrayOfDoublesSketch) object).getEstimate();
+ }
+
+ @Override
+ public String getTypeName()
+ {
+ if (metricColumns == null) {
+ return ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG;
+ }
+ return ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ArrayOfDoublesSketchAggregatorFactory)) {
+ return false;
+ }
+ final ArrayOfDoublesSketchAggregatorFactory that = (ArrayOfDoublesSketchAggregatorFactory) o;
+ if (!name.equals(that.name)) {
+ return false;
+ }
+ if (!fieldName.equals(that.fieldName)) {
+ return false;
+ }
+ if (nominalEntries != that.nominalEntries) {
+ return false;
+ }
+ if (!Objects.equals(metricColumns, that.metricColumns)) {
+ return false;
+ }
+ return numberOfValues != that.numberOfValues;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, fieldName, nominalEntries, metricColumns, numberOfValues);
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "{"
+ + "fieldName=" + fieldName
+ + ", name=" + name
+ + ", nominalEntries=" + nominalEntries
+ + ", metricColumns=" + metricColumns
+ + ", numberOfValues=" + numberOfValues
+ + "}";
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
new file mode 100644
index 00000000000..57b038af811
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.segment.BaseDoubleColumnValueSelector;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.data.IndexedInts;
+
+import java.util.List;
+
+/**
+ * This aggregator builds sketches from raw data.
+ * The input is in the form of a key and array of double values.
+ * The output is {@link com.yahoo.sketches.tuple.ArrayOfDoublesSketch}.
+ */
+public class ArrayOfDoublesSketchBuildAggregator implements Aggregator
+{
+
+ private final DimensionSelector keySelector;
+ private final BaseDoubleColumnValueSelector[] valueSelectors;
+ private double[] values; // not part of the state, but to reuse in aggregate() method
+ private ArrayOfDoublesUpdatableSketch sketch;
+
+ public ArrayOfDoublesSketchBuildAggregator(
+ final DimensionSelector keySelector,
+ final List valueSelectors,
+ final int nominalEntries
+ )
+ {
+ this.keySelector = keySelector;
+ this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]);
+ values = new double[valueSelectors.size()];
+ sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
+ .setNumberOfValues(valueSelectors.size()).build();
+ }
+
+ /**
+ * This method uses synchronization because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ */
+ @Override
+ public void aggregate()
+ {
+ final IndexedInts keys = keySelector.getRow();
+ for (int i = 0; i < valueSelectors.length; i++) {
+ values[i] = valueSelectors[i].getDouble();
+ }
+ synchronized (this) {
+ for (int i = 0, keysSize = keys.size(); i < keysSize; i++) {
+ final String key = keySelector.lookupName(keys.get(i));
+ sketch.update(key, values);
+ }
+ }
+ }
+
+ /**
+ * This method uses synchronization because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ * The returned sketch is a separate instance of ArrayOfDoublesCompactSketch
+ * representing the current state of the aggregation, and is not affected by consequent
+ * aggregate() calls
+ */
+ @Override
+ public synchronized Object get()
+ {
+ return sketch.compact();
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ sketch = null;
+ values = null;
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
new file mode 100644
index 00000000000..b3cbcdae8fc
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.google.common.util.concurrent.Striped;
+import com.yahoo.memory.WritableMemory;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketches;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseDoubleColumnValueSelector;
+import io.druid.segment.DimensionSelector;
+import io.druid.segment.data.IndexedInts;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+/**
+ * This aggregator builds sketches from raw data.
+ * The input is in the form of a key and array of double values.
+ * The output is {@link com.yahoo.sketches.tuple.ArrayOfDoublesSketch}.
+ */
+public class ArrayOfDoublesSketchBuildBufferAggregator implements BufferAggregator
+{
+
+ private static final int NUM_STRIPES = 64; // for locking per buffer position (power of 2 to make index computation faster)
+
+ private final DimensionSelector keySelector;
+ private final BaseDoubleColumnValueSelector[] valueSelectors;
+ private final int nominalEntries;
+ private final int maxIntermediateSize;
+ private double[] values; // not part of the state, but to reuse in aggregate() method
+ private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES);
+
+ public ArrayOfDoublesSketchBuildBufferAggregator(
+ final DimensionSelector keySelector,
+ final List valueSelectors,
+ int nominalEntries,
+ int maxIntermediateSize
+ )
+ {
+ this.keySelector = keySelector;
+ this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]);
+ this.nominalEntries = nominalEntries;
+ this.maxIntermediateSize = maxIntermediateSize;
+ values = new double[valueSelectors.size()];
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ final WritableMemory mem = WritableMemory.wrap(buf);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
+ .setNumberOfValues(valueSelectors.length)
+ .setNumberOfValues(valueSelectors.length).build(region);
+ }
+
+ /**
+ * This method uses locks because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ */
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position)
+ {
+ for (int i = 0; i < valueSelectors.length; i++) {
+ values[i] = valueSelectors[i].getDouble();
+ }
+ final IndexedInts keys = keySelector.getRow();
+ // Wrapping memory and ArrayOfDoublesSketch is inexpensive compared to sketch operations.
+ // Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator
+ // might might be considered, but it would increase complexity including relocate() support.
+ final WritableMemory mem = WritableMemory.wrap(buf);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ final Lock lock = stripedLock.getAt(lockIndex(position)).writeLock();
+ lock.lock();
+ try {
+ final ArrayOfDoublesUpdatableSketch sketch = ArrayOfDoublesSketches.wrapUpdatableSketch(region);
+ for (int i = 0, keysSize = keys.size(); i < keysSize; i++) {
+ final String key = keySelector.lookupName(keys.get(i));
+ sketch.update(key, values);
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * This method uses locks because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ * The returned sketch is a separate instance of ArrayOfDoublesCompactSketch
+ * representing the current state of the aggregation, and is not affected by consequent
+ * aggregate() calls
+ */
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ final WritableMemory mem = WritableMemory.wrap(buf);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ final Lock lock = stripedLock.getAt(lockIndex(position)).readLock();
+ lock.lock();
+ try {
+ final ArrayOfDoublesUpdatableSketch sketch = (ArrayOfDoublesUpdatableSketch) ArrayOfDoublesSketches
+ .wrapSketch(region);
+ return sketch.compact();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public float getFloat(final ByteBuffer buf, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getLong(final ByteBuffer buf, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ values = null;
+ }
+
+ @Override
+ public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
+ {
+ inspector.visit("keySelector", keySelector);
+ inspector.visit("valueSelectors", valueSelectors);
+ }
+
+ // compute lock index to avoid boxing in Striped.get() call
+ static int lockIndex(final int position)
+ {
+ return smear(position) % NUM_STRIPES;
+ }
+
+ // from https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/Striped.java#L536-L548
+ private static int smear(int hashCode)
+ {
+ hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
+ return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildComplexMetricSerde.java
new file mode 100644
index 00000000000..f8af7ad7eca
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildComplexMetricSerde.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import io.druid.data.input.InputRow;
+import io.druid.segment.serde.ComplexMetricExtractor;
+
+public class ArrayOfDoublesSketchBuildComplexMetricSerde extends ArrayOfDoublesSketchMergeComplexMetricSerde
+{
+
+ @Override
+ public ComplexMetricExtractor getExtractor()
+ {
+ return new ComplexMetricExtractor()
+ {
+ @Override
+ public Class> extractedClass()
+ {
+ return Object.class;
+ }
+
+ @Override
+ public Object extractValue(final InputRow inputRow, final String metricName)
+ {
+ return inputRow.getRaw(metricName);
+ }
+ };
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchJsonSerializer.java
new file mode 100644
index 00000000000..4ee39f79ea4
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchJsonSerializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.io.IOException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+public class ArrayOfDoublesSketchJsonSerializer extends JsonSerializer
+{
+
+ @Override
+ public void serialize(
+ final ArrayOfDoublesSketch sketch,
+ final JsonGenerator generator,
+ final SerializerProvider provider
+ ) throws IOException, JsonProcessingException
+ {
+ generator.writeBinary(sketch.toByteArray());
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
new file mode 100644
index 00000000000..ed201c5f870
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeAggregator.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
+import io.druid.query.aggregation.Aggregator;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+/**
+ * This aggregator merges existing sketches.
+ * The input column contains ArrayOfDoublesSketch.
+ * The output is {@link ArrayOfDoublesSketch} that is a union of the input sketches.
+ */
+public class ArrayOfDoublesSketchMergeAggregator implements Aggregator
+{
+
+ private final BaseObjectColumnValueSelector selector;
+ private ArrayOfDoublesUnion union;
+
+ public ArrayOfDoublesSketchMergeAggregator(
+ final BaseObjectColumnValueSelector selector,
+ final int nominalEntries,
+ final int numberOfValues
+ )
+ {
+ this.selector = selector;
+ union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries).setNumberOfValues(numberOfValues)
+ .buildUnion();
+ }
+
+ /**
+ * This method uses synchronization because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ */
+ @Override
+ public void aggregate()
+ {
+ final ArrayOfDoublesSketch update = selector.getObject();
+ if (update == null) {
+ return;
+ }
+ synchronized (this) {
+ union.update(update);
+ }
+ }
+
+ /**
+ * This method uses synchronization because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ * The returned sketch is a separate instance of ArrayOfDoublesCompactSketch
+ * representing the current state of the aggregation, and is not affected by consequent
+ * aggregate() calls
+ */
+ @Override
+ public synchronized Object get()
+ {
+ return union.getResult();
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ union = null;
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java
new file mode 100644
index 00000000000..95e317ae42b
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeBufferAggregator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.google.common.util.concurrent.Striped;
+import com.yahoo.memory.WritableMemory;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketches;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+import io.druid.segment.BaseObjectColumnValueSelector;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+/**
+ * This aggregator merges existing sketches.
+ * The input column contains ArrayOfDoublesSketch.
+ * The output is {@link ArrayOfDoublesSketch} that is a union of the input sketches.
+ */
+public class ArrayOfDoublesSketchMergeBufferAggregator implements BufferAggregator
+{
+
+ private static final int NUM_STRIPES = 64; // for locking per buffer position
+
+ private final BaseObjectColumnValueSelector selector;
+ private final int nominalEntries;
+ private final int numberOfValues;
+ private final int maxIntermediateSize;
+ private final Striped stripedLock = Striped.readWriteLock(NUM_STRIPES);
+
+ public ArrayOfDoublesSketchMergeBufferAggregator(
+ final BaseObjectColumnValueSelector selector,
+ final int nominalEntries,
+ final int numberOfValues,
+ final int maxIntermediateSize
+ )
+ {
+ this.selector = selector;
+ this.nominalEntries = nominalEntries;
+ this.numberOfValues = numberOfValues;
+ this.maxIntermediateSize = maxIntermediateSize;
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ final WritableMemory mem = WritableMemory.wrap(buf);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries)
+ .setNumberOfValues(numberOfValues).buildUnion(region);
+ }
+
+ /**
+ * This method uses locks because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ */
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position)
+ {
+ final ArrayOfDoublesSketch update = selector.getObject();
+ if (update == null) {
+ return;
+ }
+ // Wrapping memory and ArrayOfDoublesUnion is inexpensive compared to union operations.
+ // Maintaining a cache of wrapped objects per buffer position like in Theta sketch aggregator
+ // might might be considered, but it would increase complexity including relocate() support.
+ final WritableMemory mem = WritableMemory.wrap(buf);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).writeLock();
+ lock.lock();
+ try {
+ final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region);
+ union.update(update);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * This method uses locks because it can be used during indexing,
+ * and Druid can call aggregate() and get() concurrently
+ * https://github.com/druid-io/druid/pull/3956
+ * The returned sketch is a separate instance of ArrayOfDoublesCompactSketch
+ * representing the current state of the aggregation, and is not affected by consequent
+ * aggregate() calls
+ */
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ final WritableMemory mem = WritableMemory.wrap(buf);
+ final WritableMemory region = mem.writableRegion(position, maxIntermediateSize);
+ final Lock lock = stripedLock.getAt(ArrayOfDoublesSketchBuildBufferAggregator.lockIndex(position)).readLock();
+ lock.lock();
+ try {
+ final ArrayOfDoublesUnion union = ArrayOfDoublesSketches.wrapUnion(region);
+ return union.getResult();
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public float getFloat(final ByteBuffer buf, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getLong(final ByteBuffer buf, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
+ public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
+ {
+ inspector.visit("selector", selector);
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeComplexMetricSerde.java
new file mode 100644
index 00000000000..06fcedf4405
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMergeComplexMetricSerde.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.nio.ByteBuffer;
+
+import io.druid.data.input.InputRow;
+import io.druid.segment.GenericColumnSerializer;
+import io.druid.segment.column.ColumnBuilder;
+import io.druid.segment.data.GenericIndexed;
+import io.druid.segment.data.ObjectStrategy;
+import io.druid.segment.serde.ComplexColumnPartSupplier;
+import io.druid.segment.serde.ComplexMetricExtractor;
+import io.druid.segment.serde.ComplexMetricSerde;
+import io.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer;
+import io.druid.segment.writeout.SegmentWriteOutMedium;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+public class ArrayOfDoublesSketchMergeComplexMetricSerde extends ComplexMetricSerde
+{
+
+ @Override
+ public String getTypeName()
+ {
+ return ArrayOfDoublesSketchModule.ARRAY_OF_DOUBLES_SKETCH;
+ }
+
+ @Override
+ public ComplexMetricExtractor getExtractor()
+ {
+ return new ComplexMetricExtractor()
+ {
+ @Override
+ public Class> extractedClass()
+ {
+ return ArrayOfDoublesSketch.class;
+ }
+
+ @Override
+ public Object extractValue(final InputRow inputRow, final String metricName)
+ {
+ final Object object = inputRow.getRaw(metricName);
+ if (object == null || object instanceof ArrayOfDoublesSketch) {
+ return object;
+ }
+ return ArrayOfDoublesSketchOperations.deserialize(object);
+ }
+ };
+ }
+
+ @Override
+ public void deserializeColumn(final ByteBuffer buffer, final ColumnBuilder builder)
+ {
+ final GenericIndexed ge = GenericIndexed.read(buffer, ArrayOfDoublesSketchObjectStrategy.STRATEGY);
+ builder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), ge));
+ }
+
+ @Override
+ public ObjectStrategy getObjectStrategy()
+ {
+ return ArrayOfDoublesSketchObjectStrategy.STRATEGY;
+ }
+
+ // support large columns
+ @Override
+ public GenericColumnSerializer getSerializer(final SegmentWriteOutMedium segmentWriteOutMedium, final String column)
+ {
+ return LargeColumnSupportedComplexColumnSerializer.create(segmentWriteOutMedium, column, this.getObjectStrategy());
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java
new file mode 100644
index 00000000000..a57868a44ef
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchModule.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+import io.druid.initialization.DruidModule;
+import io.druid.segment.serde.ComplexMetrics;
+
+/**
+ * This module is to support numeric Tuple sketches, which extend the functionality of the count-distinct
+ * Theta sketches by adding arrays of double values associated with unique keys.
+ *
+ * See Tuple Sketch Overview
+ */
+public class ArrayOfDoublesSketchModule implements DruidModule
+{
+
+ public static final String ARRAY_OF_DOUBLES_SKETCH = "arrayOfDoublesSketch";
+
+ public static final String ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG = "arrayOfDoublesSketchMerge";
+ public static final String ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG = "arrayOfDoublesSketchBuild";
+
+ @Override
+ public void configure(final Binder binder)
+ {
+ if (ComplexMetrics.getSerdeForType(ARRAY_OF_DOUBLES_SKETCH) == null) {
+ ComplexMetrics.registerSerde(ARRAY_OF_DOUBLES_SKETCH, new ArrayOfDoublesSketchMergeComplexMetricSerde());
+ }
+
+ if (ComplexMetrics.getSerdeForType(ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG) == null) {
+ ComplexMetrics.registerSerde(
+ ARRAY_OF_DOUBLES_SKETCH_MERGE_AGG,
+ new ArrayOfDoublesSketchMergeComplexMetricSerde()
+ );
+ }
+
+ if (ComplexMetrics.getSerdeForType(ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG) == null) {
+ ComplexMetrics.registerSerde(
+ ARRAY_OF_DOUBLES_SKETCH_BUILD_AGG,
+ new ArrayOfDoublesSketchBuildComplexMetricSerde()
+ );
+ }
+ }
+
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return Arrays.asList(
+ new SimpleModule("ArrayOfDoublesSketchModule").registerSubtypes(
+ new NamedType(
+ ArrayOfDoublesSketchAggregatorFactory.class,
+ ARRAY_OF_DOUBLES_SKETCH
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToEstimatePostAggregator.class,
+ "arrayOfDoublesSketchToEstimate"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator.class,
+ "arrayOfDoublesSketchToEstimateAndBounds"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToNumEntriesPostAggregator.class,
+ "arrayOfDoublesSketchToNumEntries"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToMeansPostAggregator.class,
+ "arrayOfDoublesSketchToMeans"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToVariancesPostAggregator.class,
+ "arrayOfDoublesSketchToVariances"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToQuantilesSketchPostAggregator.class,
+ "arrayOfDoublesSketchToQuantilesSketch"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchSetOpPostAggregator.class,
+ "arrayOfDoublesSketchSetOp"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchTTestPostAggregator.class,
+ "arrayOfDoublesSketchTTest"
+ ),
+ new NamedType(
+ ArrayOfDoublesSketchToStringPostAggregator.class,
+ "arrayOfDoublesSketchToString"
+ )
+ ).addSerializer(ArrayOfDoublesSketch.class, new ArrayOfDoublesSketchJsonSerializer())
+ );
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMultiPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMultiPostAggregator.java
new file mode 100644
index 00000000000..39a736e1937
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchMultiPostAggregator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Sets;
+
+import io.druid.query.aggregation.PostAggregator;
+
+/**
+ * Base class for post aggs taking multiple sketches as input
+ */
+public abstract class ArrayOfDoublesSketchMultiPostAggregator extends ArrayOfDoublesSketchPostAggregator
+{
+
+ private final List fields;
+ private Set dependentFields;
+
+ @JsonCreator
+ public ArrayOfDoublesSketchMultiPostAggregator(final String name, final List fields)
+ {
+ super(name);
+ this.fields = fields;
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ if (dependentFields == null) {
+ dependentFields = Sets.newHashSet(super.getDependentFields());
+ for (final PostAggregator field : fields) {
+ dependentFields.addAll(field.getDependentFields());
+ }
+ }
+ return dependentFields;
+ }
+
+ @JsonProperty
+ public List getFields()
+ {
+ return fields;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "{"
+ + "name='" + getName() + '\''
+ + ", fields=" + fields
+ + "}";
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (!super.equals(o)) {
+ return false;
+ }
+ // this check is used here instead of instanceof because this is an abstract class
+ // and subclasses not overriding equals should not be equal to each other
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ArrayOfDoublesSketchMultiPostAggregator that = (ArrayOfDoublesSketchMultiPostAggregator) o;
+ return fields.equals(that.fields);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), fields);
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchNoOpAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchNoOpAggregator.java
new file mode 100644
index 00000000000..593ab163bed
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchNoOpAggregator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder;
+
+import io.druid.query.aggregation.Aggregator;
+
+public class ArrayOfDoublesSketchNoOpAggregator implements Aggregator
+{
+
+ private final ArrayOfDoublesSketch emptySketch;
+
+ public ArrayOfDoublesSketchNoOpAggregator(final int numberOfValues)
+ {
+ emptySketch = new ArrayOfDoublesUpdatableSketchBuilder().setNumberOfValues(numberOfValues).build().compact();
+ }
+
+ @Override
+ public void aggregate()
+ {
+ }
+
+ @Override
+ public Object get()
+ {
+ return emptySketch;
+ }
+
+ @Override
+ public float getFloat()
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getLong()
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchNoOpBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchNoOpBufferAggregator.java
new file mode 100644
index 00000000000..127481043ea
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchNoOpBufferAggregator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.nio.ByteBuffer;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUpdatableSketchBuilder;
+
+import io.druid.query.aggregation.BufferAggregator;
+import io.druid.query.monomorphicprocessing.RuntimeShapeInspector;
+
+public class ArrayOfDoublesSketchNoOpBufferAggregator implements BufferAggregator
+{
+
+ private final ArrayOfDoublesSketch emptySketch;
+
+ public ArrayOfDoublesSketchNoOpBufferAggregator(final int numberOfValues)
+ {
+ emptySketch = new ArrayOfDoublesUpdatableSketchBuilder().setNumberOfValues(numberOfValues).build().compact();
+ }
+
+ @Override
+ public void init(final ByteBuffer buf, final int position)
+ {
+ }
+
+ @Override
+ public void aggregate(final ByteBuffer buf, final int position)
+ {
+ }
+
+ @Override
+ public Object get(final ByteBuffer buf, final int position)
+ {
+ return emptySketch;
+ }
+
+ @Override
+ public float getFloat(final ByteBuffer buf, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public long getLong(final ByteBuffer buf, final int position)
+ {
+ throw new UnsupportedOperationException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ }
+
+ @Override
+ public void inspectRuntimeShape(final RuntimeShapeInspector inspector)
+ {
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java
new file mode 100644
index 00000000000..4aadba5215a
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchObjectStrategy.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import io.druid.segment.data.ObjectStrategy;
+
+import java.nio.ByteBuffer;
+
+import javax.annotation.Nullable;
+
+import com.yahoo.memory.Memory;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketches;
+
+public class ArrayOfDoublesSketchObjectStrategy implements ObjectStrategy
+{
+
+ static final ArrayOfDoublesSketchObjectStrategy STRATEGY = new ArrayOfDoublesSketchObjectStrategy();
+
+ @Override
+ public int compare(final ArrayOfDoublesSketch s1, final ArrayOfDoublesSketch s2)
+ {
+ return ArrayOfDoublesSketchAggregatorFactory.COMPARATOR.compare(s1, s2);
+ }
+
+ @Override
+ public Class extends ArrayOfDoublesSketch> getClazz()
+ {
+ return ArrayOfDoublesSketch.class;
+ }
+
+ @Override
+ public ArrayOfDoublesSketch fromByteBuffer(final ByteBuffer buffer, final int numBytes)
+ {
+ return ArrayOfDoublesSketches.wrapSketch(Memory.wrap(buffer).region(buffer.position(), numBytes));
+ }
+
+ @Override
+ @Nullable
+ public byte[] toBytes(@Nullable final ArrayOfDoublesSketch sketch)
+ {
+ if (sketch == null) {
+ return null;
+ }
+ return sketch.toByteArray();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchOperations.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchOperations.java
new file mode 100644
index 00000000000..a6b68c545a6
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchOperations.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.yahoo.memory.Memory;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.codec.binary.Base64;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesAnotB;
+import com.yahoo.sketches.tuple.ArrayOfDoublesCombiner;
+import com.yahoo.sketches.tuple.ArrayOfDoublesIntersection;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketches;
+import com.yahoo.sketches.tuple.ArrayOfDoublesUnion;
+
+import io.druid.java.util.common.IAE;
+import io.druid.java.util.common.ISE;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSetOperationBuilder;
+
+public class ArrayOfDoublesSketchOperations
+{
+
+ public enum Operation
+ {
+ UNION {
+ @Override
+ public ArrayOfDoublesSketch apply(final int nominalEntries, final int numberOfValues, final ArrayOfDoublesSketch[] sketches)
+ {
+ final ArrayOfDoublesUnion union = new ArrayOfDoublesSetOperationBuilder().setNominalEntries(nominalEntries)
+ .setNumberOfValues(numberOfValues).buildUnion();
+ for (final ArrayOfDoublesSketch sketch : sketches) {
+ union.update(sketch);
+ }
+ return union.getResult();
+ }
+ },
+ INTERSECT {
+ @Override
+ public ArrayOfDoublesSketch apply(final int nominalEntries, final int numberOfValues, final ArrayOfDoublesSketch[] sketches)
+ {
+ final ArrayOfDoublesIntersection intersection = new ArrayOfDoublesSetOperationBuilder()
+ .setNominalEntries(nominalEntries).setNumberOfValues(numberOfValues).buildIntersection();
+ for (final ArrayOfDoublesSketch sketch : sketches) {
+ intersection.update(sketch, COMBINER);
+ }
+ return intersection.getResult();
+ }
+ },
+ NOT {
+ @Override
+ public ArrayOfDoublesSketch apply(final int nominalEntries, final int numberOfValues, final ArrayOfDoublesSketch[] sketches)
+ {
+ if (sketches.length < 1) {
+ throw new IAE("A-Not-B requires at least 1 sketch");
+ }
+
+ if (sketches.length == 1) {
+ return sketches[0];
+ }
+
+ ArrayOfDoublesSketch result = sketches[0];
+ for (int i = 1; i < sketches.length; i++) {
+ final ArrayOfDoublesAnotB aNotB = new ArrayOfDoublesSetOperationBuilder().setNumberOfValues(numberOfValues)
+ .buildAnotB();
+ aNotB.update(result, sketches[i]);
+ result = aNotB.getResult();
+ }
+ return result;
+ }
+ };
+
+ public abstract ArrayOfDoublesSketch apply(int nominalEntries, int numberOfValues, ArrayOfDoublesSketch[] sketches);
+ }
+
+ // This is how to combine values for sketch intersection.
+ // Might not fit all use cases.
+ private static ArrayOfDoublesCombiner COMBINER = new ArrayOfDoublesCombiner()
+ {
+ @Override
+ public double[] combine(final double[] a, final double[] b)
+ {
+ final double[] result = new double[a.length];
+ for (int i = 0; i < a.length; i++) {
+ result[i] = a[i] + b[i];
+ }
+ return result;
+ }
+ };
+
+ public static ArrayOfDoublesSketch deserialize(final Object serializedSketch)
+ {
+ if (serializedSketch instanceof String) {
+ return deserializeFromBase64EncodedString((String) serializedSketch);
+ } else if (serializedSketch instanceof byte[]) {
+ return deserializeFromByteArray((byte[]) serializedSketch);
+ } else if (serializedSketch instanceof ArrayOfDoublesSketch) {
+ return (ArrayOfDoublesSketch) serializedSketch;
+ }
+ throw new ISE("Object is not of a type that can deserialize to sketch: %s", serializedSketch.getClass());
+ }
+
+ public static ArrayOfDoublesSketch deserializeFromBase64EncodedString(final String str)
+ {
+ return deserializeFromByteArray(Base64.decodeBase64(str.getBytes(StandardCharsets.UTF_8)));
+ }
+
+ public static ArrayOfDoublesSketch deserializeFromByteArray(final byte[] data)
+ {
+ final Memory mem = Memory.wrap(data);
+ return ArrayOfDoublesSketches.wrapSketch(mem);
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchPostAggregator.java
new file mode 100644
index 00000000000..a9bcf495d30
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchPostAggregator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.PostAggregator;
+
+/**
+ * Base class for all post aggs
+ */
+public abstract class ArrayOfDoublesSketchPostAggregator implements PostAggregator
+{
+
+ private final String name;
+
+ public ArrayOfDoublesSketchPostAggregator(final String name)
+ {
+ this.name = Preconditions.checkNotNull(name, "name is null");
+ }
+
+ @Override
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ArrayOfDoublesSketchPostAggregator)) {
+ return false;
+ }
+ final ArrayOfDoublesSketchPostAggregator that = (ArrayOfDoublesSketchPostAggregator) o;
+ return name.equals(that.getName());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return name.hashCode();
+ }
+
+ @Override
+ public PostAggregator decorate(final Map map)
+ {
+ return this;
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchSetOpPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchSetOpPostAggregator.java
new file mode 100644
index 00000000000..584c14ba6b8
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchSetOpPostAggregator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.Util;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns a result of a specified set operation on the given array of sketches. Supported operations are:
+ * union, intersection and set difference (UNION, INTERSECT, NOT).
+ */
+public class ArrayOfDoublesSketchSetOpPostAggregator extends ArrayOfDoublesSketchMultiPostAggregator
+{
+
+ private final ArrayOfDoublesSketchOperations.Operation operation;
+ private final int nominalEntries;
+ private final int numberOfValues;
+
+ @JsonCreator
+ public ArrayOfDoublesSketchSetOpPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("operation") final String operation,
+ @JsonProperty("nominalEntries") @Nullable final Integer nominalEntries,
+ @JsonProperty("numberOfValues") @Nullable final Integer numberOfValues,
+ @JsonProperty("fields") List fields
+ )
+ {
+ super(name, fields);
+ this.operation = ArrayOfDoublesSketchOperations.Operation.valueOf(operation);
+ this.nominalEntries = nominalEntries == null ? Util.DEFAULT_NOMINAL_ENTRIES : nominalEntries;
+ this.numberOfValues = numberOfValues == null ? 1 : numberOfValues;
+ Util.checkIfPowerOf2(this.nominalEntries, "size");
+
+ if (fields.size() <= 1) {
+ throw new IAE("Illegal number of fields[%d], must be > 1", fields.size());
+ }
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return ArrayOfDoublesSketchAggregatorFactory.COMPARATOR;
+ }
+
+ @Override
+ public ArrayOfDoublesSketch compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch[] sketches = new ArrayOfDoublesSketch[getFields().size()];
+ for (int i = 0; i < sketches.length; i++) {
+ sketches[i] = (ArrayOfDoublesSketch) getFields().get(i).compute(combinedAggregators);
+ }
+ return operation.apply(nominalEntries, numberOfValues, sketches);
+ }
+
+ @JsonProperty
+ public String getOperation()
+ {
+ return operation.toString();
+ }
+
+ @JsonProperty
+ public int getNominalEntries()
+ {
+ return nominalEntries;
+ }
+
+ @JsonProperty
+ public int getNumberOfValues()
+ {
+ return numberOfValues;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "{"
+ + "name='" + getName() + '\''
+ + ", fields=" + getFields()
+ + ", operation=" + operation
+ + ", nominalEntries=" + nominalEntries
+ + ", numberOfValues=" + numberOfValues
+ + "}";
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (!super.equals(o)) {
+ return false;
+ }
+ if (!(o instanceof ArrayOfDoublesSketchSetOpPostAggregator)) {
+ return false;
+ }
+ final ArrayOfDoublesSketchSetOpPostAggregator that = (ArrayOfDoublesSketchSetOpPostAggregator) o;
+ if (nominalEntries != that.nominalEntries) {
+ return false;
+ }
+ if (numberOfValues != that.numberOfValues) {
+ return false;
+ }
+ return operation.equals(that.operation);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), operation, nominalEntries, numberOfValues);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_SET_OP_CACHE_TYPE_ID)
+ .appendCacheables(getFields())
+ .appendInt(nominalEntries)
+ .appendInt(numberOfValues)
+ .appendString(operation.toString())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchTTestPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchTTestPostAggregator.java
new file mode 100644
index 00000000000..30f748c9c9f
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchTTestPostAggregator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+import org.apache.commons.math3.stat.inference.TTest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Performs Student's t-test and returns a list of p-values given two instances of {@link ArrayOfDoublesSketch}.
+ * The result will be N double values, where N is the number of double values kept in the sketch per key.
+ * See Student's t-test
+ */
+public class ArrayOfDoublesSketchTTestPostAggregator extends ArrayOfDoublesSketchMultiPostAggregator
+{
+
+ @JsonCreator
+ public ArrayOfDoublesSketchTTestPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("fields") List fields
+ )
+ {
+ super(name, fields);
+ if (fields.size() != 2) {
+ throw new IAE("Illegal number of fields[%d], must be 2", fields.size());
+ }
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing arrays of p values is not supported");
+ }
+
+ @Override
+ public double[] compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch1 = (ArrayOfDoublesSketch) getFields().get(0).compute(combinedAggregators);
+ final ArrayOfDoublesSketch sketch2 = (ArrayOfDoublesSketch) getFields().get(1).compute(combinedAggregators);
+ if (sketch1.getNumValues() != sketch2.getNumValues()) {
+ throw new IAE(
+ "Sketches have different number of values: %d and %d",
+ sketch1.getNumValues(),
+ sketch2.getNumValues()
+ );
+ }
+
+ final SummaryStatistics[] stats1 = getStats(sketch1);
+ final SummaryStatistics[] stats2 = getStats(sketch2);
+
+ final int numberOfValues = sketch1.getNumValues();
+ final double[] pValues = new double[numberOfValues];
+ final TTest test = new TTest();
+ for (int i = 0; i < pValues.length; i++) {
+ pValues[i] = test.tTest(stats1[i], stats2[i]);
+ }
+ return pValues;
+ }
+
+ private static SummaryStatistics[] getStats(final ArrayOfDoublesSketch sketch)
+ {
+ final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
+ Arrays.setAll(stats, i -> new SummaryStatistics());
+ final ArrayOfDoublesSketchIterator it = sketch.iterator();
+ while (it.next()) {
+ final double[] values = it.getValues();
+ for (int i = 0; i < values.length; i++) {
+ stats[i].addValue(values[i]);
+ }
+ }
+ return stats;
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_T_TEST_CACHE_TYPE_ID)
+ .appendCacheables(getFields())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator.java
new file mode 100644
index 00000000000..9d5d29798d2
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns a distinct count estimate and error bounds from a given {@link ArrayOfDoublesSketch}."
+ * The result will be three double values: estimate of the number of distinct keys, lower bound and upper bound.
+ * The bounds are provided at the given number of standard deviations (optional, defaults to 1).
+ * This must be an integer value of 1, 2 or 3 corresponding to approximately 68.3%, 95.4% and 99.7%
+ * confidence intervals.
+ */
+public class ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ private final int numStdDevs;
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field,
+ @JsonProperty("numStdDevs") @Nullable final Integer numStdDevs
+ )
+ {
+ super(name, field);
+ this.numStdDevs = numStdDevs == null ? 1 : numStdDevs;
+ }
+
+ @JsonProperty
+ public int getNumStdDevs()
+ {
+ return numStdDevs;
+ }
+
+ @Override
+ public double[] compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ return new double[] {sketch.getEstimate(), sketch.getLowerBound(numStdDevs), sketch.getUpperBound(numStdDevs)};
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing arrays of estimates and error bounds is not supported");
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "{" +
+ "name='" + getName() + '\'' +
+ ", field=" + getField() +
+ ", numStdDevs=" + numStdDevs +
+ "}";
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (!super.equals(o)) {
+ return false;
+ }
+ if (!(o instanceof ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator)) {
+ return false;
+ }
+ final ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator that = (ArrayOfDoublesSketchToEstimateAndBoundsPostAggregator) o;
+ if (numStdDevs != that.numStdDevs) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), numStdDevs);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_ESTIMATE_AND_BOUNDS_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .appendInt(numStdDevs)
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToEstimatePostAggregator.java
new file mode 100644
index 00000000000..fd20c710ad9
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToEstimatePostAggregator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns a distinct count estimate from a given {@link ArrayOfDoublesSketch}.
+ */
+public class ArrayOfDoublesSketchToEstimatePostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToEstimatePostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field
+ )
+ {
+ super(name, field);
+ }
+
+ @Override
+ public Double compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ return sketch.getEstimate();
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_ESTIMATE_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMeansPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMeansPostAggregator.java
new file mode 100644
index 00000000000..dc3f778dffc
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToMeansPostAggregator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns a list of mean values from a given {@link ArrayOfDoublesSketch}.
+ * The result will be N double values, where N is the number of double values kept in the sketch per key.
+ */
+public class ArrayOfDoublesSketchToMeansPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToMeansPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field
+ )
+ {
+ super(name, field);
+ }
+
+ @Override
+ public double[] compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
+ Arrays.setAll(stats, i -> new SummaryStatistics());
+ final ArrayOfDoublesSketchIterator it = sketch.iterator();
+ while (it.next()) {
+ final double[] values = it.getValues();
+ for (int i = 0; i < values.length; i++) {
+ stats[i].addValue(values[i]);
+ }
+ }
+ final double[] means = new double[sketch.getNumValues()];
+ Arrays.setAll(means, i -> stats[i].getMean());
+ return means;
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing arrays of mean values is not supported");
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_MEANS_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToNumEntriesPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToNumEntriesPostAggregator.java
new file mode 100644
index 00000000000..0955b44d843
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToNumEntriesPostAggregator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns the number of retained entries from a given {@link ArrayOfDoublesSketch}.
+ */
+public class ArrayOfDoublesSketchToNumEntriesPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToNumEntriesPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field
+ )
+ {
+ super(name, field);
+ }
+
+ @Override
+ public Integer compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ return sketch.getRetainedEntries();
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return Comparator.naturalOrder();
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_NUM_ENTRIES_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToQuantilesSketchPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToQuantilesSketchPostAggregator.java
new file mode 100644
index 00000000000..8aabbcfbfc3
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToQuantilesSketchPostAggregator.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Comparator;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.Nullable;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
+import io.druid.query.cache.CacheKeyBuilder;
+
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator;
+import com.yahoo.sketches.quantiles.DoublesSketch;
+import com.yahoo.sketches.quantiles.UpdateDoublesSketch;
+
+/**
+ * Returns a quanitles {@link DoublesSketch} constructed from a given column of double values from a given
+ * {@link ArrayOfDoublesSketch} using parameter k that determines the accuracy and size of the quantiles sketch.
+ * The column number is optional (the default is 1).
+ * The parameter k is optional (the default is defined in the sketch library).
+ * The result is a quantiles sketch.
+ * See Quantiles Sketch Overview
+ */
+public class ArrayOfDoublesSketchToQuantilesSketchPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ private static final int DEFAULT_QUANTILES_SKETCH_SIZE = 128;
+
+ private final int column;
+ private final int k;
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToQuantilesSketchPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field,
+ @JsonProperty("column") @Nullable final Integer column,
+ @JsonProperty("k") @Nullable final Integer k
+ )
+ {
+ super(name, field);
+ this.column = column == null ? 1 : column;
+ this.k = k == null ? DEFAULT_QUANTILES_SKETCH_SIZE : k;
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ return DoublesSketchAggregatorFactory.COMPARATOR;
+ }
+
+ @Override
+ public DoublesSketch compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ final UpdateDoublesSketch qs = UpdateDoublesSketch.builder().setK(k).build();
+ final ArrayOfDoublesSketchIterator it = sketch.iterator();
+ while (it.next()) {
+ qs.update(it.getValues()[column - 1]); // convert 1-based column number to zero-based index
+ }
+ return qs;
+ }
+
+ @JsonProperty
+ public int getColumn()
+ {
+ return column;
+ }
+
+ @JsonProperty
+ public int getK()
+ {
+ return k;
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "{" +
+ "name='" + getName() + '\'' +
+ ", field=" + getField() +
+ ", column=" + column +
+ ", k=" + k +
+ "}";
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (!super.equals(o)) {
+ return false;
+ }
+ if (!(o instanceof ArrayOfDoublesSketchToQuantilesSketchPostAggregator)) {
+ return false;
+ }
+ final ArrayOfDoublesSketchToQuantilesSketchPostAggregator that = (ArrayOfDoublesSketchToQuantilesSketchPostAggregator) o;
+ if (column != that.column) {
+ return false;
+ }
+ if (k != that.k) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), column, k);
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_QUANTILES_SKETCH_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .appendInt(column)
+ .appendInt(k)
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToStringPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToStringPostAggregator.java
new file mode 100644
index 00000000000..8350f8e4974
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToStringPostAggregator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Comparator;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns a human-readable summary of a given {@link ArrayOfDoublesSketch}.
+ * This is a string returned by toString() method of the sketch.
+ * This can be useful for debugging.
+ */
+public class ArrayOfDoublesSketchToStringPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToStringPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field
+ )
+ {
+ super(name, field);
+ }
+
+ @Override
+ public String compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ return sketch.toString();
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing sketch summaries is not supported");
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_STRING_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToVariancesPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToVariancesPostAggregator.java
new file mode 100644
index 00000000000..8b4403542b7
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchToVariancesPostAggregator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Map;
+
+import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketch;
+import com.yahoo.sketches.tuple.ArrayOfDoublesSketchIterator;
+
+import io.druid.java.util.common.IAE;
+import io.druid.query.aggregation.AggregatorUtil;
+import io.druid.query.aggregation.PostAggregator;
+import io.druid.query.cache.CacheKeyBuilder;
+
+/**
+ * Returns a list of variance values from a given {@link ArrayOfDoublesSketch}.
+ * The result will be N double values, where N is the number of double values kept in the sketch per key.
+ */
+public class ArrayOfDoublesSketchToVariancesPostAggregator extends ArrayOfDoublesSketchUnaryPostAggregator
+{
+
+ @JsonCreator
+ public ArrayOfDoublesSketchToVariancesPostAggregator(
+ @JsonProperty("name") final String name,
+ @JsonProperty("field") final PostAggregator field
+ )
+ {
+ super(name, field);
+ }
+
+ @Override
+ public double[] compute(final Map combinedAggregators)
+ {
+ final ArrayOfDoublesSketch sketch = (ArrayOfDoublesSketch) getField().compute(combinedAggregators);
+ final SummaryStatistics[] stats = new SummaryStatistics[sketch.getNumValues()];
+ Arrays.setAll(stats, i -> new SummaryStatistics());
+ final ArrayOfDoublesSketchIterator it = sketch.iterator();
+ while (it.next()) {
+ final double[] values = it.getValues();
+ for (int i = 0; i < values.length; i++) {
+ stats[i].addValue(values[i]);
+ }
+ }
+ final double[] variances = new double[sketch.getNumValues()];
+ Arrays.setAll(variances, i -> stats[i].getVariance());
+ return variances;
+ }
+
+ @Override
+ public Comparator getComparator()
+ {
+ throw new IAE("Comparing arrays of variance values is not supported");
+ }
+
+ @Override
+ public byte[] getCacheKey()
+ {
+ return new CacheKeyBuilder(AggregatorUtil.ARRAY_OF_DOUBLES_SKETCH_TO_VARIANCES_CACHE_TYPE_ID)
+ .appendCacheable(getField())
+ .build();
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchUnaryPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchUnaryPostAggregator.java
new file mode 100644
index 00000000000..3db041feb9a
--- /dev/null
+++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchUnaryPostAggregator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+import io.druid.query.aggregation.PostAggregator;
+
+/**
+ * Base class for post aggs taking one sketch as input
+ */
+public abstract class ArrayOfDoublesSketchUnaryPostAggregator extends ArrayOfDoublesSketchPostAggregator
+{
+
+ private final PostAggregator field;
+ private Set dependentFields;
+
+ @JsonCreator
+ public ArrayOfDoublesSketchUnaryPostAggregator(
+ final String name,
+ final PostAggregator field
+ )
+ {
+ super(name);
+ this.field = Preconditions.checkNotNull(field, "field is null");
+ }
+
+ @JsonProperty
+ public PostAggregator getField()
+ {
+ return field;
+ }
+
+ @Override
+ public Set getDependentFields()
+ {
+ if (dependentFields == null) {
+ dependentFields = Sets.newHashSet(super.getDependentFields());
+ dependentFields.addAll(field.getDependentFields());
+ }
+ return dependentFields;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (!super.equals(o)) {
+ return false;
+ }
+ // this check is used here instead of instanceof because this is an abstract class
+ // and subclasses not overriding equals should not be equal to each other
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ArrayOfDoublesSketchUnaryPostAggregator that = (ArrayOfDoublesSketchUnaryPostAggregator) o;
+ return field.equals(that.getField());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), field);
+ }
+
+ @Override
+ public String toString()
+ {
+ return this.getClass().getSimpleName() + "{" +
+ "name='" + getName() + '\'' +
+ ", field=" + getField() +
+ "}";
+ }
+
+}
diff --git a/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
index dfc5b69e00f..319e10e0b88 100644
--- a/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
+++ b/extensions-core/datasketches/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -1,3 +1,4 @@
io.druid.query.aggregation.datasketches.theta.SketchModule
io.druid.query.aggregation.datasketches.theta.oldapi.OldApiSketchModule
io.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule
+io.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule
diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java
new file mode 100644
index 00000000000..838f5d428f9
--- /dev/null
+++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregationTest.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.query.aggregation.datasketches.tuple;
+
+import com.google.common.collect.Lists;
+import com.yahoo.sketches.quantiles.DoublesSketch;
+import io.druid.data.input.Row;
+import io.druid.initialization.DruidModule;
+import io.druid.java.util.common.granularity.Granularities;
+import io.druid.java.util.common.guava.Sequence;
+import io.druid.query.aggregation.AggregationTestHelper;
+import io.druid.query.groupby.GroupByQueryConfig;
+import io.druid.query.groupby.GroupByQueryRunnerTest;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class ArrayOfDoublesSketchAggregationTest
+{
+
+ private final AggregationTestHelper helper;
+
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public ArrayOfDoublesSketchAggregationTest(final GroupByQueryConfig config)
+ {
+ DruidModule module = new ArrayOfDoublesSketchModule();
+ module.configure(null);
+ helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
+ module.getJacksonModules(), config, tempFolder);
+ }
+
+ @Parameterized.Parameters(name = "{0}")
+ public static Collection> constructorFeeder()
+ {
+ final List