From 90c1a54b868a7e32a43895d77c79895bf242d8c4 Mon Sep 17 00:00:00 2001 From: Edward Gan Date: Wed, 13 Feb 2019 14:03:47 -0800 Subject: [PATCH] Moments Sketch custom aggregator (#6581) * Moments Sketch Integration with Druid * updates, add documentation, fix warnings * nits * disallowed base64 * update to druid 0.14 --- .../momentsketch-quantiles.md | 120 ++++++ extensions-contrib/momentsketch/pom.xml | 100 +++++ .../MomentSketchComplexMetricSerde.java | 92 ++++ .../MomentSketchJsonSerializer.java | 39 ++ .../momentsketch/MomentSketchModule.java | 82 ++++ .../MomentSketchObjectStrategy.java | 62 +++ .../momentsketch/MomentSketchWrapper.java | 189 +++++++++ .../MomentSketchAggregatorFactory.java | 294 +++++++++++++ .../MomentSketchBuildAggregator.java | 82 ++++ .../MomentSketchBuildBufferAggregator.java | 94 ++++ .../MomentSketchMaxPostAggregator.java | 130 ++++++ .../MomentSketchMergeAggregator.java | 75 ++++ .../MomentSketchMergeAggregatorFactory.java | 62 +++ .../MomentSketchMergeBufferAggregator.java | 110 +++++ .../MomentSketchMinPostAggregator.java | 129 ++++++ .../MomentSketchQuantilePostAggregator.java | 148 +++++++ .../momentsketch/MomentSketchWrapperTest.java | 53 +++ .../MomentsSketchAggregatorTest.java | 204 +++++++++ .../src/test/resources/doubles_build_data.tsv | 400 ++++++++++++++++++ pom.xml | 1 + .../query/aggregation/AggregatorUtil.java | 4 + .../aggregation/post/PostAggregatorIds.java | 3 + .../druid/query/cache/CacheKeyBuilder.java | 14 + .../druid/segment/serde/ComplexMetrics.java | 8 + 24 files changed, 2495 insertions(+) create mode 100644 docs/content/development/extensions-contrib/momentsketch-quantiles.md create mode 100644 extensions-contrib/momentsketch/pom.xml create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchComplexMetricSerde.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchJsonSerializer.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchModule.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchObjectStrategy.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapper.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchAggregatorFactory.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildAggregator.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildBufferAggregator.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMaxPostAggregator.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregator.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregatorFactory.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeBufferAggregator.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMinPostAggregator.java create mode 100644 extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchQuantilePostAggregator.java create mode 100644 extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapperTest.java create mode 100644 extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentsSketchAggregatorTest.java create mode 100644 extensions-contrib/momentsketch/src/test/resources/doubles_build_data.tsv diff --git a/docs/content/development/extensions-contrib/momentsketch-quantiles.md b/docs/content/development/extensions-contrib/momentsketch-quantiles.md new file mode 100644 index 00000000000..69bb93cffa1 --- /dev/null +++ b/docs/content/development/extensions-contrib/momentsketch-quantiles.md @@ -0,0 +1,120 @@ +--- +layout: doc_page +title: "Moment Sketches for Approximate Quantiles module" +--- + + + +# MomentSketch Quantiles Sketch module + +This module provides Druid aggregators for approximate quantile queries using the [momentsketch](https://github.com/stanford-futuredata/momentsketch) library. +The momentsketch provides coarse quantile estimates with less space and aggregation time overheads than traditional sketches, approaching the performance of counts and sums by reconstructing distributions from computed statistics. + +To use this aggregator, make sure you [include](../../operations/including-extensions.html) the extension in your config file: + +``` +druid.extensions.loadList=["druid-momentsketch"] +``` + +### Aggregator + +The result of the aggregation is a momentsketch that is the union of all sketches either built from raw data or read from the segments. + +The `momentSketch` aggregator operates over raw data while the `momentSketchMerge` aggregator should be used when aggregating pre-computed sketches. +```json +{ + "type" : , + "name" : , + "fieldName" : , + "k" : , + "compress" : + } +``` + +|property|description|required?| +|--------|-----------|---------| +|type|Type of aggregator desired. Either "momentSketch" or "momentSketchMerge" |yes| +|name|A String for the output (result) name of the calculation.|yes| +|fieldName|A String for the name of the input field (can contain sketches or raw numeric values).|yes| +|k|Parameter that determines the accuracy and size of the sketch. Higher k means higher accuracy but more space to store sketches. Usable range is generally [3,15] |no, defaults to 13.| +|compress|Flag for whether the aggregator compresses numeric values using arcsinh. Can improve robustness to skewed and long-tailed distributions, but reduces accuracy slightly on more uniform distributions.| no, defaults to true + +### Post Aggregators + +Users can query for a set of quantiles using the `momentSketchSolveQuantiles` post-aggregator on the sketches created by the `momentSketch` or `momentSketchMerge` aggregators. +```json +{ + "type" : "momentSketchSolveQuantiles", + "name" : , + "field" : , + "fractions" : +} +``` + +Users can also query for the min/max of a distribution: +```json +{ + "type" : "momentSketchMin" | "momentSketchMax", + "name" : , + "field" : , +} +``` + +### Example +As an example of a query with sketches pre-aggregated at ingestion time, one could set up the following aggregator at ingest: +```json +{ + "type": "momentSketch", + "name": "sketch", + "fieldName": "value", + "k": 10, + "compress": true, +} +``` +and make queries using the following aggregator + post-aggregator: +```json +{ + "aggregations": [{ + "type": "momentSketchMerge", + "name": "sketch", + "fieldName": "sketch", + "k": 10, + "compress": true + }], + "postAggregations": [ + { + "type": "momentSketchSolveQuantiles", + "name": "quantiles", + "fractions": [0.1, 0.5, 0.9], + "field": { + "type": "fieldAccess", + "fieldName": "sketch" + } + }, + { + "type": "momentSketchMin", + "name": "min", + "field": { + "type": "fieldAccess", + "fieldName": "sketch" + } + }] +} +``` \ No newline at end of file diff --git a/extensions-contrib/momentsketch/pom.xml b/extensions-contrib/momentsketch/pom.xml new file mode 100644 index 00000000000..b8926413a99 --- /dev/null +++ b/extensions-contrib/momentsketch/pom.xml @@ -0,0 +1,100 @@ + + + + + + druid + org.apache.druid + 0.14.0-incubating-SNAPSHOT + ../../pom.xml + + 4.0.0 + + org.apache.druid.extensions.contrib + druid-momentsketch + druid-momentsketch + Aggregators for the approximate quantile moment sketch + + + + UTF-8 + 0.12.2 + + + + + com.github.stanford-futuredata.momentsketch + momentsketch-solver + 0.1.1 + + + com.google.guava + guava + ${guava.version} + provided + + + org.apache.druid + druid-core + ${project.parent.version} + provided + + + org.apache.druid + druid-processing + ${project.parent.version} + provided + + + junit + junit + test + + + org.easymock + easymock + test + + + org.apache.druid + druid-core + ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-processing + ${project.parent.version} + test-jar + test + + + org.apache.druid + druid-server + ${project.parent.version} + test + + + + + \ No newline at end of file diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchComplexMetricSerde.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchComplexMetricSerde.java new file mode 100644 index 00000000000..4e631e53525 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchComplexMetricSerde.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchAggregatorFactory; +import org.apache.druid.segment.GenericColumnSerializer; +import org.apache.druid.segment.column.ColumnBuilder; +import org.apache.druid.segment.data.GenericIndexed; +import org.apache.druid.segment.data.ObjectStrategy; +import org.apache.druid.segment.serde.ComplexColumnPartSupplier; +import org.apache.druid.segment.serde.ComplexMetricExtractor; +import org.apache.druid.segment.serde.ComplexMetricSerde; +import org.apache.druid.segment.serde.LargeColumnSupportedComplexColumnSerializer; +import org.apache.druid.segment.writeout.SegmentWriteOutMedium; + +import java.nio.ByteBuffer; + +public class MomentSketchComplexMetricSerde extends ComplexMetricSerde +{ + private static final MomentSketchObjectStrategy STRATEGY = new MomentSketchObjectStrategy(); + + @Override + public String getTypeName() + { + return MomentSketchAggregatorFactory.TYPE_NAME; + } + + @Override + public ComplexMetricExtractor getExtractor() + { + return new ComplexMetricExtractor() + { + @Override + public Class extractedClass() + { + return MomentSketchWrapper.class; + } + + @Override + public Object extractValue(final InputRow inputRow, final String metricName) + { + return (MomentSketchWrapper) inputRow.getRaw(metricName); + } + }; + } + + @Override + public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder) + { + final GenericIndexed column = GenericIndexed.read( + buffer, + STRATEGY, + builder.getFileMapper() + ); + builder.setComplexColumnSupplier(new ComplexColumnPartSupplier(getTypeName(), column)); + } + + @Override + public ObjectStrategy getObjectStrategy() + { + return STRATEGY; + } + + @Override + public GenericColumnSerializer getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column) + { + return LargeColumnSupportedComplexColumnSerializer.create( + segmentWriteOutMedium, + column, + this.getObjectStrategy() + ); + } + +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchJsonSerializer.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchJsonSerializer.java new file mode 100644 index 00000000000..268c4017a7a --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchJsonSerializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +import java.io.IOException; + +public class MomentSketchJsonSerializer extends JsonSerializer +{ + @Override + public void serialize( + MomentSketchWrapper momentsSketch, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) throws IOException + { + jsonGenerator.writeBinary(momentsSketch.toByteArray()); + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchModule.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchModule.java new file mode 100644 index 00000000000..29f21355d26 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchModule.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchAggregatorFactory; +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchMaxPostAggregator; +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchMergeAggregatorFactory; +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchMinPostAggregator; +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchQuantilePostAggregator; +import org.apache.druid.segment.serde.ComplexMetrics; + +import java.util.List; + +/** + * Module defining aggregators for the moments approximate quantiles sketch + * @see MomentSketchAggregatorFactory + */ +public class MomentSketchModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule( + getClass().getSimpleName() + ).registerSubtypes( + new NamedType( + MomentSketchAggregatorFactory.class, + MomentSketchAggregatorFactory.TYPE_NAME + ), + new NamedType( + MomentSketchMergeAggregatorFactory.class, + MomentSketchMergeAggregatorFactory.TYPE_NAME + ), + new NamedType( + MomentSketchQuantilePostAggregator.class, + MomentSketchQuantilePostAggregator.TYPE_NAME + ), + new NamedType( + MomentSketchMinPostAggregator.class, + MomentSketchMinPostAggregator.TYPE_NAME + ), + new NamedType( + MomentSketchMaxPostAggregator.class, + MomentSketchMaxPostAggregator.TYPE_NAME + ) + ).addSerializer(MomentSketchWrapper.class, new MomentSketchJsonSerializer()) + ); + } + + @Override + public void configure(Binder binder) + { + ComplexMetrics.registerSerde( + MomentSketchAggregatorFactory.TYPE_NAME, + MomentSketchComplexMetricSerde::new + ); + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchObjectStrategy.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchObjectStrategy.java new file mode 100644 index 00000000000..7a706f90399 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchObjectStrategy.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch; + +import org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchAggregatorFactory; +import org.apache.druid.segment.data.ObjectStrategy; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class MomentSketchObjectStrategy implements ObjectStrategy +{ + private static final byte[] EMPTY_BYTES = new byte[0]; + + @Override + public Class getClazz() + { + return MomentSketchWrapper.class; + } + + @Override + public MomentSketchWrapper fromByteBuffer(ByteBuffer buffer, int numBytes) + { + if (numBytes == 0) { + return null; + } + buffer.limit(buffer.position() + numBytes); + return MomentSketchWrapper.fromBytes(buffer); + } + + @Override + public byte[] toBytes(@Nullable MomentSketchWrapper val) + { + if (val == null) { + return EMPTY_BYTES; + } + return val.toByteArray(); + } + + @Override + public int compare(MomentSketchWrapper o1, MomentSketchWrapper o2) + { + return MomentSketchAggregatorFactory.COMPARATOR.compare(o1, o2); + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapper.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapper.java new file mode 100644 index 00000000000..41ecf30fb84 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapper.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch; + +import com.github.stanfordfuturedata.momentsketch.MomentSolver; +import com.github.stanfordfuturedata.momentsketch.MomentStruct; + +import java.nio.ByteBuffer; + +/** + * Class for wrapping the operations of the moments sketch for use in + * the moment sketch aggregator + * {@link org.apache.druid.query.aggregation.momentsketch.aggregator.MomentSketchAggregatorFactory}. + * + * k controls the size and accuracy provided by the sketch. + * The sinh function is used to compress the range of data to allow for more robust results + * on skewed and long-tailed metrics, but slightly reducing accuracy on metrics with more uniform + * distributions. + */ +public class MomentSketchWrapper +{ + // The MomentStruct object stores the relevant statistics about a metric distribution. + protected MomentStruct data; + // Whether we use arcsinh to compress the range + protected boolean useArcSinh = true; + + public MomentSketchWrapper(int k) + { + data = new MomentStruct(k); + } + + public MomentSketchWrapper(MomentStruct data) + { + this.data = data; + } + + public void setCompressed(boolean flag) + { + useArcSinh = flag; + } + + public boolean getCompressed() + { + return useArcSinh; + } + + public int getK() + { + return data.power_sums.length; + } + + public double[] getPowerSums() + { + return data.power_sums; + } + + public double getMin() + { + if (useArcSinh) { + return Math.sinh(data.min); + } else { + return data.min; + } + } + + public double getMax() + { + if (useArcSinh) { + return Math.sinh(data.max); + } else { + return data.max; + } + } + + public void add(double rawX) + { + double x = rawX; + if (useArcSinh) { + // Since Java does not have a native arcsinh implementation we + // compute it manually using the following formula. + // This is the inverse operation of Math.sinh + x = Math.log(rawX + Math.sqrt(1 + rawX * rawX)); + } + data.add(x); + } + + public void merge(MomentSketchWrapper other) + { + data.merge(other.data); + } + + public byte[] toByteArray() + { + ByteBuffer bb = ByteBuffer.allocate(2 * Integer.BYTES + (data.power_sums.length + 2) * Double.BYTES); + return toBytes(bb).array(); + } + + public MomentSolver getSolver() + { + MomentSolver ms = new MomentSolver(data); + return ms; + } + + /** + * Estimates quantiles given the statistics in a moments sketch. + * @param fractions real values between [0,1] for which we want to estimate quantiles + * + * @return estimated quantiles. + */ + public double[] getQuantiles(double[] fractions) + { + // The solver attempts to construct a distribution estimate which matches the + // statistics tracked by the moments sketch. We can then read off quantile estimates + // from the reconstructed distribution. + // This operation can be relatively expensive (~1 ms) so we set the parameters from distribution + // reconstruction to conservative values. + MomentSolver ms = new MomentSolver(data); + // Constants here are chosen to yield maximum precision while keeping solve times ~1ms on 2Ghz cpu + // Grid size can be increased if longer solve times are acceptable + ms.setGridSize(1024); + ms.setMaxIter(15); + ms.solve(); + double[] rawQuantiles = ms.getQuantiles(fractions); + for (int i = 0; i < fractions.length; i++) { + if (useArcSinh) { + rawQuantiles[i] = Math.sinh(rawQuantiles[i]); + } + } + return rawQuantiles; + } + + public ByteBuffer toBytes(ByteBuffer bb) + { + int compressedInt = getCompressed() ? 1 : 0; + bb.putInt(data.power_sums.length); + bb.putInt(compressedInt); + bb.putDouble(data.min); + bb.putDouble(data.max); + for (double x : data.power_sums) { + bb.putDouble(x); + } + return bb; + } + + public static MomentSketchWrapper fromBytes(ByteBuffer bb) + { + int k = bb.getInt(); + int compressedInt = bb.getInt(); + boolean compressed = (compressedInt > 0); + MomentStruct m = new MomentStruct(k); + m.min = bb.getDouble(); + m.max = bb.getDouble(); + for (int i = 0; i < k; i++) { + m.power_sums[i] = bb.getDouble(); + } + MomentSketchWrapper mw = new MomentSketchWrapper(m); + mw.setCompressed(compressed); + return mw; + } + + public static MomentSketchWrapper fromByteArray(byte[] input) + { + ByteBuffer bb = ByteBuffer.wrap(input); + return fromBytes(bb); + } + + @Override + public String toString() + { + return data.toString(); + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchAggregatorFactory.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchAggregatorFactory.java new file mode 100644 index 00000000000..918ad3e04ed --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchAggregatorFactory.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.AggregatorFactoryNotMergeableException; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.query.cache.CacheKeyBuilder; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ValueType; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; + +/** + * Aggregation operations over the moment-based quantile sketch + * available on github and described + * in the paper Moment-based quantile sketches. + * + * This sketch stores a set of (k) statistics about univariate metrics that can be used to + * solve for approximate quantiles of the original distribution at query time after aggregating + * the statistics. + */ +public class MomentSketchAggregatorFactory extends AggregatorFactory +{ + // Default number of moments (k) chosen for ~1% quantile error. + public static final int DEFAULT_K = 13; + // Safer to compress data with unknown ranges by default, but reduces accuracy on uniform data + public static final boolean DEFAULT_COMPRESS = true; + + private final String name; + private final String fieldName; + // Number of moments tracked. Larger k allows for better estimates but greater resource usage + private final int k; + // Controls whether or not data is compressed onto a smaller range using arcsinh + private final boolean compress; + private final byte cacheTypeId; + + public static final String TYPE_NAME = "momentSketch"; + + @JsonCreator + public MomentSketchAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("fieldName") final String fieldName, + @Nullable @JsonProperty("k") final Integer k, + @Nullable @JsonProperty("compress") final Boolean compress + ) + { + this(name, fieldName, k, compress, AggregatorUtil.MOMENTS_SKETCH_BUILD_CACHE_TYPE_ID); + } + + MomentSketchAggregatorFactory( + final String name, + final String fieldName, + @Nullable final Integer k, + @Nullable final Boolean compress, + final byte cacheTypeId + ) + { + Objects.requireNonNull(name, "Must have a valid, non-null aggregator name"); + this.name = name; + Objects.requireNonNull(fieldName, "Parameter fieldName must be specified"); + this.fieldName = fieldName; + this.k = k == null ? DEFAULT_K : k; + this.compress = compress == null ? DEFAULT_COMPRESS : compress; + this.cacheTypeId = cacheTypeId; + } + + + @Override + public byte[] getCacheKey() + { + return new CacheKeyBuilder( + cacheTypeId + ).appendString(fieldName).appendInt(k).appendBoolean(compress).build(); + } + + + @Override + public Aggregator factorize(ColumnSelectorFactory metricFactory) + { + ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName); + if (cap == null || ValueType.isNumeric(cap.getType())) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + return new MomentSketchBuildAggregator(selector, k, getCompress()); + } else { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + return new MomentSketchMergeAggregator(selector, k, getCompress()); + } + } + + @Override + public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory) + { + ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName); + if (cap == null || ValueType.isNumeric(cap.getType())) { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + return new MomentSketchBuildBufferAggregator(selector, k, getCompress()); + } else { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName); + return new MomentSketchMergeBufferAggregator(selector, k, getCompress()); + } + } + + public static final Comparator COMPARATOR = Comparator.nullsFirst( + Comparator.comparingDouble(a -> a.getPowerSums()[0]) + ); + + @Override + public Comparator getComparator() + { + return COMPARATOR; + } + + @Override + public Object combine(@Nullable Object lhs, @Nullable Object rhs) + { + if (lhs == null) { + return rhs; + } + if (rhs == null) { + return lhs; + } + MomentSketchWrapper union = (MomentSketchWrapper) lhs; + union.merge((MomentSketchWrapper) rhs); + return union; + } + + @Override + public AggregatorFactory getCombiningFactory() + { + return new MomentSketchMergeAggregatorFactory(name, k, compress); + } + + @Override + public AggregatorFactory getMergingFactory(AggregatorFactory other) throws AggregatorFactoryNotMergeableException + { + if (other.getName().equals(this.getName()) && this.getClass() == other.getClass()) { + return getCombiningFactory(); + } else { + throw new AggregatorFactoryNotMergeableException(this, other); + } + } + + @Override + public List getRequiredColumns() + { + return Collections.singletonList( + new MomentSketchAggregatorFactory( + fieldName, + fieldName, + k, + compress + ) + ); + } + + private MomentSketchWrapper deserializeFromByteArray(byte[] bytes) + { + return MomentSketchWrapper.fromByteArray(bytes); + } + + @Override + public Object deserialize(Object serializedSketch) + { + if (serializedSketch instanceof String) { + String str = (String) serializedSketch; + return deserializeFromByteArray(StringUtils.decodeBase64(StringUtils.toUtf8(str))); + } else if (serializedSketch instanceof byte[]) { + return deserializeFromByteArray((byte[]) serializedSketch); + } else if (serializedSketch instanceof MomentSketchWrapper) { + return serializedSketch; + } + throw new ISE( + "Object cannot be deserialized to a Moments Sketch: " + + serializedSketch.getClass() + ); + } + + @Override + public Object finalizeComputation(Object object) + { + return object; + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public String getFieldName() + { + return fieldName; + } + + @JsonProperty + public int getK() + { + return k; + } + + @JsonProperty + public boolean getCompress() + { + return compress; + } + + @Override + public List requiredFields() + { + return Collections.singletonList(fieldName); + } + + @Override + public String getTypeName() + { + return TYPE_NAME; + } + + @Override + public int getMaxIntermediateSize() + { + // k double precision moments, 2 doubles for the min and max + // one integer to specify the number of moments + // one integer to specify whether data range is compressed + return (k + 2) * Double.BYTES + 2 * Integer.BYTES; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || !getClass().equals(o.getClass())) { + return false; + } + final MomentSketchAggregatorFactory that = (MomentSketchAggregatorFactory) o; + + return Objects.equals(name, that.name) && + Objects.equals(fieldName, that.fieldName) && + k == that.k && + compress == that.compress; + } + + @Override + public int hashCode() + { + return Objects.hash(name, fieldName, k, compress); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name=" + name + + ", fieldName=" + fieldName + + ", k=" + k + + ", compress=" + compress + + "}"; + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildAggregator.java new file mode 100644 index 00000000000..db3f4409831 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildAggregator.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; + +public class MomentSketchBuildAggregator implements Aggregator +{ + private final BaseDoubleColumnValueSelector valueSelector; + private final int k; + private final boolean compress; + + private MomentSketchWrapper momentsSketch; + + public MomentSketchBuildAggregator( + final BaseDoubleColumnValueSelector valueSelector, + final int k, + final boolean compress + ) + { + this.valueSelector = valueSelector; + this.k = k; + this.compress = compress; + momentsSketch = new MomentSketchWrapper(k); + momentsSketch.setCompressed(compress); + } + + @Override + public void aggregate() + { + momentsSketch.add(valueSelector.getDouble()); + } + + @Override + public Object get() + { + return momentsSketch; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("not implemented"); + } + + @Override + public Aggregator clone() + { + return new MomentSketchBuildAggregator(valueSelector, k, compress); + } + + @Override + public void close() + { + momentsSketch = null; + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildBufferAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildBufferAggregator.java new file mode 100644 index 00000000000..7f049cf1534 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchBuildBufferAggregator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.segment.BaseDoubleColumnValueSelector; + +import java.nio.ByteBuffer; + +public class MomentSketchBuildBufferAggregator implements BufferAggregator +{ + private final BaseDoubleColumnValueSelector selector; + private final int k; + private final boolean compress; + + public MomentSketchBuildBufferAggregator( + final BaseDoubleColumnValueSelector valueSelector, + final int k, + final boolean compress + ) + { + this.selector = valueSelector; + this.k = k; + this.compress = compress; + } + + @Override + public synchronized void init(final ByteBuffer buffer, final int position) + { + ByteBuffer mutationBuffer = buffer.duplicate(); + mutationBuffer.position(position); + + MomentSketchWrapper emptyStruct = new MomentSketchWrapper(k); + emptyStruct.setCompressed(compress); + emptyStruct.toBytes(mutationBuffer); + } + + @Override + public synchronized void aggregate(final ByteBuffer buffer, final int position) + { + ByteBuffer mutationBuffer = buffer.duplicate(); + mutationBuffer.position(position); + + MomentSketchWrapper ms0 = MomentSketchWrapper.fromBytes(mutationBuffer); + double x = selector.getDouble(); + ms0.add(x); + + mutationBuffer.position(position); + ms0.toBytes(mutationBuffer); + } + + @Override + public synchronized Object get(final ByteBuffer buffer, final int position) + { + ByteBuffer mutationBuffer = buffer.duplicate(); + mutationBuffer.position(position); + return MomentSketchWrapper.fromBytes(mutationBuffer); + } + + @Override + public float getFloat(final ByteBuffer buffer, final int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(final ByteBuffer buffer, final int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMaxPostAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMaxPostAggregator.java new file mode 100644 index 00000000000..38755a6b702 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMaxPostAggregator.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class MomentSketchMaxPostAggregator implements PostAggregator +{ + private final String name; + private final PostAggregator field; + + public static final String TYPE_NAME = "momentSketchMax"; + + @JsonCreator + public MomentSketchMaxPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field + ) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @Override + public Object compute(final Map combinedAggregators) + { + final MomentSketchWrapper sketch = (MomentSketchWrapper) field.compute(combinedAggregators); + return sketch.getMax(); + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + "}"; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MomentSketchMaxPostAggregator that = (MomentSketchMaxPostAggregator) o; + if (!name.equals(that.name)) { + return false; + } + return field.equals(that.field); + } + + @Override + public int hashCode() + { + return (name.hashCode() * 31 + field.hashCode()); + } + + @Override + public byte[] getCacheKey() + { + final CacheKeyBuilder builder = new CacheKeyBuilder( + PostAggregatorIds.MOMENTS_SKETCH_TO_MAX_CACHE_TYPE_ID + ).appendCacheable(field); + return builder.build(); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregator.java new file mode 100644 index 00000000000..c03dd369065 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregator.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.segment.ColumnValueSelector; + +public class MomentSketchMergeAggregator implements Aggregator +{ + private final ColumnValueSelector selector; + private MomentSketchWrapper momentsSketch; + + public MomentSketchMergeAggregator( + ColumnValueSelector selector, + final int k, + final boolean compress + ) + { + this.selector = selector; + this.momentsSketch = new MomentSketchWrapper(k); + momentsSketch.setCompressed(compress); + } + + @Override + public void aggregate() + { + final MomentSketchWrapper sketch = selector.getObject(); + if (sketch == null) { + return; + } + this.momentsSketch.merge(sketch); + } + + @Override + public Object get() + { + return momentsSketch; + } + + @Override + public float getFloat() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong() + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + momentsSketch = null; + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregatorFactory.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregatorFactory.java new file mode 100644 index 00000000000..27444589798 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeAggregatorFactory.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.query.aggregation.Aggregator; +import org.apache.druid.query.aggregation.AggregatorUtil; +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; + +public class MomentSketchMergeAggregatorFactory extends MomentSketchAggregatorFactory +{ + public static final String TYPE_NAME = "momentSketchMerge"; + + @JsonCreator + public MomentSketchMergeAggregatorFactory( + @JsonProperty("name") final String name, + @JsonProperty("k") final Integer k, + @JsonProperty("compress") final Boolean compress + ) + { + super(name, name, k, compress, AggregatorUtil.MOMENTS_SKETCH_MERGE_CACHE_TYPE_ID); + } + + @Override + public Aggregator factorize(final ColumnSelectorFactory metricFactory) + { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector( + getFieldName()); + return new MomentSketchMergeAggregator(selector, getK(), getCompress()); + } + + @Override + public BufferAggregator factorizeBuffered(final ColumnSelectorFactory metricFactory) + { + final ColumnValueSelector selector = metricFactory.makeColumnValueSelector( + getFieldName() + ); + return new MomentSketchMergeBufferAggregator(selector, getK(), getCompress()); + } + +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeBufferAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeBufferAggregator.java new file mode 100644 index 00000000000..505d1ebed97 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMergeBufferAggregator.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import org.apache.druid.query.aggregation.BufferAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.ColumnValueSelector; + +import java.nio.ByteBuffer; + +public class MomentSketchMergeBufferAggregator implements BufferAggregator +{ + private final ColumnValueSelector selector; + private final int size; + private final boolean compress; + + public MomentSketchMergeBufferAggregator( + ColumnValueSelector selector, + int size, + boolean compress + ) + { + this.selector = selector; + this.size = size; + this.compress = compress; + } + + @Override + public void init(ByteBuffer buf, int position) + { + MomentSketchWrapper h = new MomentSketchWrapper(size); + h.setCompressed(compress); + + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + h.toBytes(mutationBuffer); + } + + @Override + public void aggregate(ByteBuffer buf, int position) + { + MomentSketchWrapper msNext = selector.getObject(); + if (msNext == null) { + return; + } + ByteBuffer mutationBuffer = buf.duplicate(); + mutationBuffer.position(position); + + MomentSketchWrapper ms0 = MomentSketchWrapper.fromBytes(mutationBuffer); + ms0.merge(msNext); + + mutationBuffer.position(position); + ms0.toBytes(mutationBuffer); + } + + @Override + public Object get(ByteBuffer buf, int position) + { + ByteBuffer mutationBuffer = buf.asReadOnlyBuffer(); + mutationBuffer.position(position); + return MomentSketchWrapper.fromBytes(mutationBuffer); + } + + @Override + public float getFloat(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public long getLong(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public double getDouble(ByteBuffer buf, int position) + { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public void close() + { + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + inspector.visit("selector", selector); + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMinPostAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMinPostAggregator.java new file mode 100644 index 00000000000..b244243b505 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchMinPostAggregator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class MomentSketchMinPostAggregator implements PostAggregator +{ + private final String name; + private final PostAggregator field; + public static final String TYPE_NAME = "momentSketchMin"; + + @JsonCreator + public MomentSketchMinPostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field + ) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @Override + public Object compute(final Map combinedAggregators) + { + final MomentSketchWrapper sketch = (MomentSketchWrapper) field.compute(combinedAggregators); + return sketch.getMin(); + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + "}"; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MomentSketchMinPostAggregator that = (MomentSketchMinPostAggregator) o; + if (!name.equals(that.name)) { + return false; + } + return field.equals(that.field); + } + + @Override + public int hashCode() + { + return (name.hashCode() * 31 + field.hashCode()); + } + + @Override + public byte[] getCacheKey() + { + final CacheKeyBuilder builder = new CacheKeyBuilder( + PostAggregatorIds.MOMENTS_SKETCH_TO_MIN_CACHE_TYPE_ID + ).appendCacheable(field); + return builder.build(); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } +} diff --git a/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchQuantilePostAggregator.java b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchQuantilePostAggregator.java new file mode 100644 index 00000000000..81f5b63acf0 --- /dev/null +++ b/extensions-contrib/momentsketch/src/main/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentSketchQuantilePostAggregator.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.PostAggregator; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.query.aggregation.post.PostAggregatorIds; +import org.apache.druid.query.cache.CacheKeyBuilder; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; +import java.util.Set; + +public class MomentSketchQuantilePostAggregator implements PostAggregator +{ + private final String name; + private final PostAggregator field; + private final double[] fractions; + + public static final String TYPE_NAME = "momentSketchSolveQuantiles"; + + @JsonCreator + public MomentSketchQuantilePostAggregator( + @JsonProperty("name") final String name, + @JsonProperty("field") final PostAggregator field, + @JsonProperty("fractions") final double[] fractions + ) + { + this.name = Preconditions.checkNotNull(name, "name is null"); + this.field = Preconditions.checkNotNull(field, "field is null"); + this.fractions = Preconditions.checkNotNull(fractions, "array of fractions is null"); + } + + @Override + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public PostAggregator getField() + { + return field; + } + + @JsonProperty + public double[] getFractions() + { + return fractions; + } + + @Override + public Object compute(final Map combinedAggregators) + { + final MomentSketchWrapper sketch = (MomentSketchWrapper) field.compute(combinedAggregators); + double[] quantiles = sketch.getQuantiles(fractions); + return quantiles; + } + + @Override + public Comparator getComparator() + { + throw new IAE("Comparing arrays of quantiles is not supported"); + } + + @Override + public Set getDependentFields() + { + return field.getDependentFields(); + } + + @Override + public String toString() + { + return getClass().getSimpleName() + "{" + + "name='" + name + '\'' + + ", field=" + field + + ", fractions=" + Arrays.toString(fractions) + + "}"; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final MomentSketchQuantilePostAggregator that = (MomentSketchQuantilePostAggregator) o; + if (!name.equals(that.name)) { + return false; + } + if (!Arrays.equals(fractions, that.fractions)) { + return false; + } + return field.equals(that.field); + } + + @Override + public int hashCode() + { + return (name.hashCode() * 31 + field.hashCode()) * 31 + Arrays.hashCode(fractions); + } + + @Override + public byte[] getCacheKey() + { + final CacheKeyBuilder builder = new CacheKeyBuilder( + PostAggregatorIds.MOMENTS_SKETCH_TO_QUANTILES_CACHE_TYPE_ID + ) + .appendCacheable(field) + .appendDoubleArray(fractions); + return builder.build(); + } + + @Override + public PostAggregator decorate(final Map map) + { + return this; + } + +} diff --git a/extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapperTest.java b/extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapperTest.java new file mode 100644 index 00000000000..74eadd0241e --- /dev/null +++ b/extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/MomentSketchWrapperTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class MomentSketchWrapperTest +{ + @Test + public void testDeserialize() + { + MomentSketchWrapper mw = new MomentSketchWrapper(10); + mw.setCompressed(false); + mw.add(10); + byte[] bs = mw.toByteArray(); + MomentSketchWrapper mw2 = MomentSketchWrapper.fromByteArray(bs); + + assertEquals(10, mw2.getPowerSums()[1], 1e-10); + } + + @Test + public void testSimpleSolve() + { + MomentSketchWrapper mw = new MomentSketchWrapper(13); + mw.setCompressed(true); + for (int x = 0; x < 101; x++) { + mw.add((double) x); + } + double[] ps = {0.0, 0.5, 1.0}; + double[] qs = mw.getQuantiles(ps); + assertEquals(0, qs[0], 1.0); + assertEquals(50, qs[1], 1.0); + } +} diff --git a/extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentsSketchAggregatorTest.java b/extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentsSketchAggregatorTest.java new file mode 100644 index 00000000000..2d5293b7191 --- /dev/null +++ b/extensions-contrib/momentsketch/src/test/java/org/apache/druid/query/aggregation/momentsketch/aggregator/MomentsSketchAggregatorTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.momentsketch.aggregator; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.Row; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.query.aggregation.AggregationTestHelper; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchModule; +import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class MomentsSketchAggregatorTest +{ + + private final AggregationTestHelper helper; + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + public MomentsSketchAggregatorTest(final GroupByQueryConfig config) + { + DruidModule module = new MomentSketchModule(); + module.configure(null); + helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper( + module.getJacksonModules(), config, tempFolder); + } + + @Parameterized.Parameters(name = "{0}") + public static Collection constructorFeeder() + { + final List constructors = new ArrayList<>(); + for (GroupByQueryConfig config : GroupByQueryRunnerTest.testConfigs()) { + constructors.add(new Object[]{config}); + } + return constructors; + } + + // this is to test Json properties and equals + @Test + public void serializeDeserializeFactoryWithFieldName() throws Exception + { + ObjectMapper objectMapper = new DefaultObjectMapper(); + MomentSketchAggregatorFactory factory = new MomentSketchAggregatorFactory( + "name", "fieldName", 128, true + ); + + MomentSketchAggregatorFactory other = objectMapper.readValue( + objectMapper.writeValueAsString(factory), + MomentSketchAggregatorFactory.class + ); + + assertEquals(factory, other); + } + + @Test + public void buildingSketchesAtIngestionTime() throws Exception + { + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("doubles_build_data.tsv").getFile()), + String.join( + "\n", + "{", + " \"type\": \"string\",", + " \"parseSpec\": {", + " \"format\": \"tsv\",", + " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},", + " \"dimensionsSpec\": {", + " \"dimensions\": [\"product\"],", + " \"dimensionExclusions\": [ \"sequenceNumber\"],", + " \"spatialDimensions\": []", + " },", + " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", + " }", + "}" + ), + "[{\"type\": \"momentSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 10, \"compress\": true}]", + 0, + // minTimestamp + Granularities.NONE, + 10, + // maxRowCount + String.join( + "\n", + "{", + " \"queryType\": \"groupBy\",", + " \"dataSource\": \"test_datasource\",", + " \"granularity\": \"ALL\",", + " \"dimensions\": [],", + " \"aggregations\": [", + " {\"type\": \"momentSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"k\": 10, \"compress\": true}", + " ],", + " \"postAggregations\": [", + " {\"type\": \"momentSketchSolveQuantiles\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"momentSketchMin\", \"name\": \"min\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},", + " {\"type\": \"momentSketchMax\", \"name\": \"max\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}", + " ],", + " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", + "}" + ) + ); + List results = seq.toList(); + assertEquals(1, results.size()); + Row row = results.get(0); + double[] quantilesArray = (double[]) row.getRaw("quantiles"); + assertEquals(0, quantilesArray[0], 0.05); + assertEquals(.5, quantilesArray[1], 0.05); + assertEquals(1.0, quantilesArray[2], 0.05); + + Double minValue = (Double) row.getRaw("min"); + assertEquals(0.0011, minValue, 0.0001); + + Double maxValue = (Double) row.getRaw("max"); + assertEquals(0.9969, maxValue, 0.0001); + + MomentSketchWrapper sketchObject = (MomentSketchWrapper) row.getRaw("sketch"); + assertEquals(400.0, sketchObject.getPowerSums()[0], 1e-10); + } + + @Test + public void buildingSketchesAtQueryTime() throws Exception + { + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("doubles_build_data.tsv").getFile()), + String.join( + "\n", + "{", + " \"type\": \"string\",", + " \"parseSpec\": {", + " \"format\": \"tsv\",", + " \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},", + " \"dimensionsSpec\": {", + " \"dimensions\": [ \"product\"],", + " \"dimensionExclusions\": [\"sequenceNumber\"],", + " \"spatialDimensions\": []", + " },", + " \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]", + " }", + "}" + ), + "[{\"type\": \"doubleSum\", \"name\": \"value\", \"fieldName\": \"value\"}]", + 0, // minTimestamp + Granularities.NONE, + 10, // maxRowCount + String.join( + "\n", + "{", + " \"queryType\": \"groupBy\",", + " \"dataSource\": \"test_datasource\",", + " \"granularity\": \"ALL\",", + " \"dimensions\": [],", + " \"aggregations\": [", + " {\"type\": \"momentSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 10}", + " ],", + " \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]", + "}" + ) + ); + + List results = seq.toList(); + assertEquals(1, results.size()); + Row row = results.get(0); + + MomentSketchWrapper sketchObject = (MomentSketchWrapper) row.getRaw("sketch"); + // 9 total products since we pre-sum the values. + assertEquals(9.0, sketchObject.getPowerSums()[0], 1e-10); + } +} + diff --git a/extensions-contrib/momentsketch/src/test/resources/doubles_build_data.tsv b/extensions-contrib/momentsketch/src/test/resources/doubles_build_data.tsv new file mode 100644 index 00000000000..bb59faf3da8 --- /dev/null +++ b/extensions-contrib/momentsketch/src/test/resources/doubles_build_data.tsv @@ -0,0 +1,400 @@ +2016010101 0 0 0.6529403005319299 +2016010101 1 0 0.9270214958987323 +2016010101 2 0 0.6383273609981486 +2016010101 3 0 0.8088289215633632 +2016010101 4 0 0.8163864917598281 +2016010101 5 0 0.38484848588530784 +2016010101 6 0 0.7690020468986823 +2016010101 7 0 0.6212078833139824 +2016010101 8 0 0.4915825094949512 +2016010101 9 0 0.688004059332008 +2016010101 10 0 0.2536908275250508 +2016010101 11 0 0.6618435914290263 +2016010101 12 0 0.7892773595797635 +2016010101 13 0 0.08857624134076048 +2016010101 14 0 0.11992633801904151 +2016010101 15 0 0.4959192800105586 +2016010101 16 0 0.5564893557708243 +2016010101 17 0 0.7755547456799993 +2016010101 18 0 0.06420706406984311 +2016010101 19 0 0.23085639094262378 +2016010101 20 7 0.012013916725163498 +2016010101 21 7 0.34077219818209503 +2016010101 22 7 0.8445966884204918 +2016010101 23 7 0.6466142718287953 +2016010101 24 7 0.43959032391415487 +2016010101 25 7 0.7768829233737787 +2016010101 26 7 0.5899544206136442 +2016010101 27 7 0.017782361911801825 +2016010101 28 7 0.5431916165782864 +2016010101 29 7 0.8218253174439416 +2016010101 30 7 0.6372788284951859 +2016010101 31 7 0.41403671834680933 +2016010101 32 7 0.042508330730374855 +2016010101 33 7 0.7416290691530969 +2016010101 34 7 0.6990557213726277 +2016010101 35 7 0.6302154208823348 +2016010101 36 7 0.021053567154993402 +2016010101 37 7 0.770280353784988 +2016010101 38 7 0.08205576978448703 +2016010101 39 7 0.2049660800682488 +2016010101 40 5 0.08129304678049831 +2016010101 41 5 0.17754747271638005 +2016010101 42 5 0.8441702357096768 +2016010101 43 5 0.9060464737257796 +2016010101 44 5 0.5970595512785409 +2016010101 45 5 0.843859346312315 +2016010101 46 5 0.1649847892987305 +2016010101 47 5 0.5279903496999094 +2016010101 48 5 0.08758749830556767 +2016010101 49 5 0.6088480522002063 +2016010101 50 5 0.31079133043670004 +2016010101 51 5 0.43062105356651226 +2016010101 52 5 0.8542989852099488 +2016010101 53 5 0.42443162807834045 +2016010101 54 5 0.5020327054358468 +2016010101 55 5 0.36453920012074237 +2016010101 56 5 0.9884597580348689 +2016010101 57 5 0.3770559586575706 +2016010101 58 5 0.5989237303385875 +2016010101 59 5 0.9926342802399872 +2016010101 60 4 0.7813961047849703 +2016010101 61 4 0.062171533805525425 +2016010101 62 4 0.5284977503473608 +2016010101 63 4 0.5924687065581794 +2016010101 64 4 0.06305234223879275 +2016010101 65 4 0.4959562731747129 +2016010101 66 4 0.6336733165353365 +2016010101 67 4 0.48860263540869875 +2016010101 68 4 0.9387610528974851 +2016010101 69 4 0.3391271652731308 +2016010101 70 4 0.5962837638971421 +2016010101 71 4 0.9190447294921896 +2016010101 72 4 0.33082943548872534 +2016010101 73 4 0.6236359023672029 +2016010101 74 4 0.27134427542016615 +2016010101 75 4 0.11665530238761901 +2016010101 76 4 0.10469260335277608 +2016010101 77 4 0.6824658847771211 +2016010101 78 4 0.6131047630496756 +2016010101 79 4 0.9838171536972515 +2016010101 80 4 0.7484669110852756 +2016010101 81 4 0.797620888697219 +2016010101 82 4 0.7166673353657907 +2016010101 83 4 0.46968710353176557 +2016010101 84 4 0.3998491199643106 +2016010101 85 4 0.6314883585976869 +2016010101 86 4 0.8305617875577815 +2016010101 87 4 0.6867651870284084 +2016010101 88 4 0.9961677044887979 +2016010101 89 4 0.19745766301180412 +2016010101 90 4 0.2737652043079263 +2016010101 91 4 0.2954503444695358 +2016010101 92 4 0.6191902196833489 +2016010101 93 4 0.6828058006233482 +2016010101 94 4 0.7967115641510757 +2016010101 95 4 0.5485460823820962 +2016010101 96 4 0.4278132830938558 +2016010101 97 4 0.32194908458166194 +2016010101 98 4 0.07094920295725238 +2016010101 99 4 0.4351839393889565 +2016010101 100 1 0.6160833396611648 +2016010101 101 1 0.4652667787803648 +2016010101 102 1 0.5026953463132913 +2016010101 103 1 0.4103237191034753 +2016010101 104 1 0.3298554666697301 +2016010101 105 1 0.16907537273919138 +2016010101 106 1 0.6945260598989513 +2016010101 107 1 0.917138530496438 +2016010101 108 1 0.8810129148605083 +2016010101 109 1 0.11845626048380542 +2016010101 110 1 0.8848971155827816 +2016010101 111 1 0.9969103769603667 +2016010101 112 1 0.06274198529295416 +2016010101 113 1 0.2923616769686519 +2016010101 114 1 0.12621083638328634 +2016010101 115 1 0.9655188575577313 +2016010101 116 1 0.6074995164352884 +2016010101 117 1 0.5501887988201414 +2016010101 118 1 0.9406914128003497 +2016010101 119 1 0.03264873659277656 +2016010101 120 6 0.004852543443656487 +2016010101 121 6 0.11161194329252788 +2016010101 122 6 0.9403527002796559 +2016010101 123 6 0.8951866979503953 +2016010101 124 6 0.07629846897033454 +2016010101 125 6 0.9898485014275873 +2016010101 126 6 0.42827377712188075 +2016010101 127 6 0.4274796777951825 +2016010101 128 6 0.5569522946332676 +2016010101 129 6 0.028195121559112635 +2016010101 130 6 0.8599127909482382 +2016010101 131 6 0.3516112293128607 +2016010101 132 6 0.3888868189342449 +2016010101 133 6 0.644589126160206 +2016010101 134 6 0.7398741071492928 +2016010101 135 6 0.1998479248216123 +2016010101 136 6 0.8803215884594476 +2016010101 137 6 0.7079531966558515 +2016010101 138 6 0.7904290564015343 +2016010101 139 6 0.475671788742007 +2016010101 140 3 0.034708334899357096 +2016010101 141 3 0.4134637419532796 +2016010101 142 3 0.9757934592902832 +2016010101 143 3 0.37422347371609666 +2016010101 144 3 0.5904996168737154 +2016010101 145 3 0.5883259679727514 +2016010101 146 3 0.3380286015499171 +2016010101 147 3 0.42174393035143043 +2016010101 148 3 0.4764900074141757 +2016010101 149 3 0.01864239537224921 +2016010101 150 3 0.9124007087743986 +2016010101 151 3 0.8951275235699193 +2016010101 152 3 0.7037272142266654 +2016010101 153 3 0.5685506209266902 +2016010101 154 3 0.4104883958833594 +2016010101 155 3 0.7794005551450208 +2016010101 156 3 0.2879354697088996 +2016010101 157 3 0.5243215707259823 +2016010101 158 3 0.22238840286136063 +2016010101 159 3 0.11336472553284738 +2016010101 160 4 0.9800770037725316 +2016010101 161 4 0.7628237317889158 +2016010101 162 4 0.5355335935170453 +2016010101 163 4 0.9676939330565402 +2016010101 164 4 0.657825753108034 +2016010101 165 4 0.9175328548944673 +2016010101 166 4 0.6834666043257283 +2016010101 167 4 0.08580759367942314 +2016010101 168 4 0.3134740602060899 +2016010101 169 4 0.3218818254752742 +2016010101 170 4 0.6119297354994999 +2016010101 171 4 0.07086832750773142 +2016010101 172 4 0.2700864307032772 +2016010101 173 4 0.7497315076673637 +2016010101 174 4 0.4959921300968493 +2016010101 175 4 0.09294825796093753 +2016010101 176 4 0.4954515904444161 +2016010101 177 4 0.8820366880191506 +2016010101 178 4 0.17978298283728522 +2016010101 179 4 0.05259679741524781 +2016010101 180 5 0.4711892966981096 +2016010101 181 5 0.5965662941715105 +2016010101 182 5 0.4775201668966973 +2016010101 183 5 0.05084576687030873 +2016010101 184 5 0.16680660677593928 +2016010101 185 5 0.9342287333653685 +2016010101 186 5 0.8153161893769392 +2016010101 187 5 0.9362517669519288 +2016010101 188 5 0.10865218471840699 +2016010101 189 5 0.44665378915111065 +2016010101 190 5 0.8804454791937898 +2016010101 191 5 0.20666928346935398 +2016010101 192 5 0.7052479677101612 +2016010101 193 5 0.5006205470200923 +2016010101 194 5 0.23220501028575968 +2016010101 195 5 0.11776507130391467 +2016010101 196 5 0.592011744069295 +2016010101 197 5 0.7089191450076786 +2016010101 198 5 0.7269340552231702 +2016010101 199 5 0.7049554871226075 +2016010101 200 1 0.44078367400761076 +2016010101 201 1 0.7715264806037321 +2016010101 202 1 0.10151701902103971 +2016010101 203 1 0.661891806135609 +2016010101 204 1 0.23095745116331567 +2016010101 205 1 0.46625278601359255 +2016010101 206 1 0.5912486124707177 +2016010101 207 1 0.963946871892115 +2016010101 208 1 0.8172596270687692 +2016010101 209 1 0.05745699928199144 +2016010101 210 1 0.40612684342877337 +2016010101 211 1 0.6330844777969608 +2016010101 212 1 0.3148973406065705 +2016010101 213 1 0.23230462811318175 +2016010101 214 1 0.9960772952945196 +2016010101 215 1 0.4581376339786414 +2016010101 216 1 0.7181494575770677 +2016010101 217 1 0.04277917580280799 +2016010101 218 1 0.11137419446625674 +2016010101 219 1 0.014716278313423037 +2016010101 220 2 0.8988603727313186 +2016010101 221 2 0.8192124226306603 +2016010101 222 2 0.9304683598956597 +2016010101 223 2 0.4375546733938238 +2016010101 224 2 0.7676359685332207 +2016010101 225 2 0.30977859822027964 +2016010101 226 2 0.008595955287459267 +2016010101 227 2 0.6790605343724216 +2016010101 228 2 0.36949588946147993 +2016010101 229 2 0.3826798435706562 +2016010101 230 2 0.13836513167087128 +2016010101 231 2 0.4451570472364902 +2016010101 232 2 0.8944067771338549 +2016010101 233 2 0.6068095655362902 +2016010101 234 2 0.7084870042917992 +2016010101 235 2 0.5867363290655241 +2016010101 236 2 0.6903863088381504 +2016010101 237 2 0.30984947936089124 +2016010101 238 2 0.31561088279452665 +2016010101 239 2 0.006286479849849758 +2016010101 240 5 0.34397466439693725 +2016010101 241 5 0.052476003295899964 +2016010101 242 5 0.726106045184451 +2016010101 243 5 0.01559115401009159 +2016010101 244 5 0.9219270739836661 +2016010101 245 5 0.5147917330760431 +2016010101 246 5 0.41919804470784205 +2016010101 247 5 0.4145101775865617 +2016010101 248 5 0.34153038022995796 +2016010101 249 5 0.9503817180587767 +2016010101 250 5 0.6958354849389804 +2016010101 251 5 0.46000811480536297 +2016010101 252 5 0.18379911670616378 +2016010101 253 5 0.20973108758556713 +2016010101 254 5 0.5979201603287885 +2016010101 255 5 0.5552419362393491 +2016010101 256 5 0.10996555307297629 +2016010101 257 5 0.3591453585622102 +2016010101 258 5 0.06098055111386691 +2016010101 259 5 0.5227270267924988 +2016010101 260 0 0.8492702312836989 +2016010101 261 0 0.5941242001151825 +2016010101 262 0 0.6840733026822607 +2016010101 263 0 0.8109777000249937 +2016010101 264 0 0.8599286045013937 +2016010101 265 0 0.7828806670746145 +2016010101 266 0 0.8102260971867188 +2016010101 267 0 0.38306094770114385 +2016010101 268 0 0.7093609268723879 +2016010101 269 0 0.4806583187577358 +2016010101 270 0 0.5766489331365172 +2016010101 271 0 0.7565067278238041 +2016010101 272 0 0.8262768908267573 +2016010101 273 0 0.7951015619138146 +2016010101 274 0 0.1938448910588796 +2016010101 275 0 0.8884608583839426 +2016010101 276 0 0.7046203516594505 +2016010101 277 0 0.5951074760704175 +2016010101 278 0 0.38207409719784036 +2016010101 279 0 0.2445271560830221 +2016010101 280 7 0.6032919624054952 +2016010101 281 7 0.1473220747987144 +2016010101 282 7 0.38396643099307604 +2016010101 283 7 0.4431561135554619 +2016010101 284 7 0.896578318093225 +2016010101 285 7 0.6729206122043515 +2016010101 286 7 0.8498821349478478 +2016010101 287 7 0.48231924024179784 +2016010101 288 7 0.005379480238994816 +2016010101 289 7 0.8017936717647264 +2016010101 290 7 0.08193232952990348 +2016010101 291 7 0.3422943366454193 +2016010101 292 7 0.6081556855207957 +2016010101 293 7 0.641193222941943 +2016010101 294 7 0.3716858024654186 +2016010101 295 7 0.0011169303830090849 +2016010101 296 7 0.4698784438339285 +2016010101 297 7 0.958198841287214 +2016010101 298 7 0.730945048929339 +2016010101 299 7 0.1858601884405512 +2016010101 300 5 0.1020825694779407 +2016010101 301 5 0.5742385074938443 +2016010101 302 5 0.9846817584978909 +2016010101 303 5 0.3858694391491331 +2016010101 304 5 0.9822246873202894 +2016010101 305 5 0.39822015482143314 +2016010101 306 5 0.6575924137957005 +2016010101 307 5 0.02359557062746842 +2016010101 308 5 0.42059510563039115 +2016010101 309 5 0.5970764856116284 +2016010101 310 5 0.2817399870096221 +2016010101 311 5 0.5334091165258412 +2016010101 312 5 0.31199853410796585 +2016010101 313 5 0.3156991306990594 +2016010101 314 5 0.9560285139855889 +2016010101 315 5 0.7846951771498516 +2016010101 316 5 0.009731486767097897 +2016010101 317 5 0.22625857375026215 +2016010101 318 5 0.8580955944724618 +2016010101 319 5 0.9622008926137687 +2016010101 320 5 0.023872302930851297 +2016010101 321 5 0.3580981601151092 +2016010101 322 5 0.9120442264954038 +2016010101 323 5 0.5968491989965334 +2016010101 324 5 0.5028516120506729 +2016010101 325 5 0.30590552314314 +2016010101 326 5 0.5566430714368423 +2016010101 327 5 0.6441099124064397 +2016010101 328 5 0.8765287851559298 +2016010101 329 5 0.38405928947408385 +2016010101 330 5 0.29654203975364 +2016010101 331 5 0.3606921959261904 +2016010101 332 5 0.9617038824842609 +2016010101 333 5 0.3103700669261584 +2016010101 334 5 0.4935170174690311 +2016010101 335 5 0.34757561267296444 +2016010101 336 5 0.1236918485545484 +2016010101 337 5 0.24925258973306597 +2016010101 338 5 0.4104821367672965 +2016010101 339 5 0.3621850216936935 +2016010101 340 6 0.3816099229918041 +2016010101 341 6 0.9496667754823915 +2016010101 342 6 0.5594605720642025 +2016010101 343 6 0.8537860901562698 +2016010101 344 6 0.74787202967909 +2016010101 345 6 0.29699361421249604 +2016010101 346 6 0.035943527086235605 +2016010101 347 6 0.20106098029261277 +2016010101 348 6 0.6589994525818863 +2016010101 349 6 0.3851541727199762 +2016010101 350 6 0.12262059605539744 +2016010101 351 6 0.33383436408012057 +2016010101 352 6 0.5087733967157267 +2016010101 353 6 0.34978350071897446 +2016010101 354 6 0.9171509423859847 +2016010101 355 6 0.6395164525815664 +2016010101 356 6 0.659637993918835 +2016010101 357 6 0.5689746534857604 +2016010101 358 6 0.03266513163571427 +2016010101 359 6 0.5863675010868861 +2016010101 360 9 0.8665167898047901 +2016010101 361 9 0.7933960420424948 +2016010101 362 9 0.8409667771425247 +2016010101 363 9 0.9544310598825743 +2016010101 364 9 0.36206869840549716 +2016010101 365 9 0.253957983880155 +2016010101 366 9 0.08496022679431525 +2016010101 367 9 0.5483782518766319 +2016010101 368 9 0.41440902281408365 +2016010101 369 9 0.2947889064970717 +2016010101 370 9 0.659477180019486 +2016010101 371 9 0.9016744422830162 +2016010101 372 9 0.4692828259677926 +2016010101 373 9 0.4221974527778145 +2016010101 374 9 0.26318360778150285 +2016010101 375 9 0.10064081807071767 +2016010101 376 9 0.7781802619858804 +2016010101 377 9 0.529215767115243 +2016010101 378 9 0.21094147073619007 +2016010101 379 9 0.18894985078463877 +2016010101 380 5 0.20683422198832369 +2016010101 381 5 0.9506923735546904 +2016010101 382 5 0.25734447316063913 +2016010101 383 5 0.6439025323539892 +2016010101 384 5 0.9099080819805052 +2016010101 385 5 0.9331714165375404 +2016010101 386 5 0.24979840404324272 +2016010101 387 5 0.40270120064812764 +2016010101 388 5 0.35895113537427137 +2016010101 389 5 0.44814114645480074 +2016010101 390 5 0.437368419580639 +2016010101 391 5 0.2777496228001308 +2016010101 392 5 0.09350862521048608 +2016010101 393 5 0.10366624548706516 +2016010101 394 5 0.8715309310993357 +2016010101 395 5 0.8953111125914557 +2016010101 396 5 0.9410866942183567 +2016010101 397 5 0.16367286942347592 +2016010101 398 5 0.6995415361957786 +2016010101 399 5 0.7170527361072194 diff --git a/pom.xml b/pom.xml index 6c6b1945f6f..351e735b0e5 100644 --- a/pom.xml +++ b/pom.xml @@ -168,6 +168,7 @@ extensions-contrib/opentsdb-emitter extensions-contrib/materialized-view-maintenance extensions-contrib/materialized-view-selection + extensions-contrib/momentsketch distribution diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java index 19cf467b546..dddafbafd82 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java @@ -117,6 +117,10 @@ public class AggregatorUtil public static final byte BLOOM_FILTER_CACHE_TYPE_ID = 0x34; public static final byte BLOOM_FILTER_MERGE_CACHE_TYPE_ID = 0x35; + // Quantiles sketch in momentsketch extension + public static final byte MOMENTS_SKETCH_BUILD_CACHE_TYPE_ID = 0x36; + public static final byte MOMENTS_SKETCH_MERGE_CACHE_TYPE_ID = 0x37; + /** * returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg * diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java index 6d576a4a783..ea9fe883b03 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/PostAggregatorIds.java @@ -45,4 +45,7 @@ public class PostAggregatorIds public static final byte ZTEST = 21; public static final byte PVALUE_FROM_ZTEST = 22; public static final byte THETA_SKETCH_CONSTANT = 23; + public static final byte MOMENTS_SKETCH_TO_QUANTILES_CACHE_TYPE_ID = 24; + public static final byte MOMENTS_SKETCH_TO_MIN_CACHE_TYPE_ID = 25; + public static final byte MOMENTS_SKETCH_TO_MAX_CACHE_TYPE_ID = 26; } diff --git a/processing/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java b/processing/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java index 03122e63335..24c65ac5f5d 100644 --- a/processing/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java +++ b/processing/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java @@ -64,6 +64,7 @@ public class CacheKeyBuilder static final byte STRING_LIST_KEY = 8; static final byte CACHEABLE_KEY = 9; static final byte CACHEABLE_LIST_KEY = 10; + static final byte DOUBLE_ARRAY_KEY = 11; static final byte[] STRING_SEPARATOR = new byte[]{(byte) 0xFF}; static final byte[] EMPTY_BYTES = StringUtils.EMPTY_BYTES; @@ -92,6 +93,13 @@ public class CacheKeyBuilder return buffer.array(); } + private static byte[] doubleArrayToByteArray(double[] input) + { + final ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES * input.length); + buffer.asDoubleBuffer().put(input); + return buffer.array(); + } + private static byte[] cacheableToByteArray(@Nullable Cacheable cacheable) { if (cacheable == null) { @@ -254,6 +262,12 @@ public class CacheKeyBuilder return this; } + public CacheKeyBuilder appendDoubleArray(double[] input) + { + appendItem(DOUBLE_ARRAY_KEY, doubleArrayToByteArray(input)); + return this; + } + public CacheKeyBuilder appendFloatArray(float[] input) { appendItem(FLOAT_ARRAY_KEY, floatArrayToByteArray(input)); diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java index 64f37092a71..ba416d6593a 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexMetrics.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; /** */ @@ -44,4 +45,11 @@ public class ComplexMetrics } complexSerializers.put(type, serde); } + + public static void registerSerde(String type, Supplier serdeSupplier) + { + if (ComplexMetrics.getSerdeForType(type) == null) { + ComplexMetrics.registerSerde(type, serdeSupplier.get()); + } + } }