From 1523de08fba12c830d41721666a21ef28c14c201 Mon Sep 17 00:00:00 2001 From: Himanshu Date: Wed, 5 Oct 2016 10:40:14 -0500 Subject: [PATCH] SketchAggregatorFactory.combine(..) returns Union object now so that it can be reused across multiple combine(..) calls (#3471) --- .../datasketches/theta/SketchAggregator.java | 2 + .../theta/SketchAggregatorFactory.java | 21 ++++++++-- .../theta/SketchEstimatePostAggregator.java | 2 +- .../datasketches/theta/SketchModule.java | 7 +++- .../theta/SketchObjectStrategy.java | 3 ++ .../theta/SketchSetPostAggregator.java | 14 ++++++- .../theta/UnionJsonSerializer.java | 40 +++++++++++++++++++ 7 files changed, 82 insertions(+), 7 deletions(-) create mode 100644 extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java index 8e1e7643f32..b9a369c3335 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -107,6 +107,8 @@ public class SketchAggregator implements Aggregator union.update((Memory) update); } else if (update instanceof Sketch) { union.update((Sketch) update); + } else if (update instanceof Union) { + union.update(((Union) update).getResult(false, null)); } else if (update instanceof String) { union.update((String) update); } else if (update instanceof byte[]) { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index 457d5f6b12b..f45d08e607e 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -110,10 +110,21 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory @Override public Object combine(Object lhs, Object rhs) { - Union union = (Union) SetOperation.builder().build(size, Family.UNION); - updateUnion(union, lhs); - updateUnion(union, rhs); - return union.getResult(false, null); + final Union union; + if (lhs instanceof Union) { + union = (Union) lhs; + updateUnion(union, rhs); + } else if (rhs instanceof Union) { + union = (Union) rhs; + updateUnion(union, lhs); + } else { + union = (Union) SetOperation.builder().build(size, Family.UNION); + updateUnion(union, lhs); + updateUnion(union, rhs); + } + + + return union; } private void updateUnion(Union union, Object obj) @@ -124,6 +135,8 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory union.update((Memory) obj); } else if (obj instanceof Sketch) { union.update((Sketch) obj); + } else if (obj instanceof Union) { + union.update(((Union) obj).getResult(false, null)); } else { throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName()); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 34bb7e1e2aa..4c5156c58f4 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -82,7 +82,7 @@ public class SketchEstimatePostAggregator implements PostAggregator @Override public Object compute(Map combinedAggregators) { - Sketch sketch = (Sketch) field.compute(combinedAggregators); + Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(combinedAggregators)); if (errorBoundsStdDev != null) { SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( sketch.getEstimate(), diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java index 84b0853cf22..4daea218a59 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Union; import io.druid.initialization.DruidModule; import io.druid.segment.serde.ComplexMetrics; @@ -71,7 +72,11 @@ public class SketchModule implements DruidModule Sketch.class, new SketchJsonSerializer() ) .addSerializer( - Memory.class, new MemoryJsonSerializer()) + Memory.class, new MemoryJsonSerializer() + ) + .addSerializer( + Union.class, new UnionJsonSerializer() + ) ); } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java index a09146855a2..99aae90d974 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java @@ -26,6 +26,7 @@ import com.yahoo.sketches.memory.MemoryRegion; import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.Union; import io.druid.segment.data.ObjectStrategy; import java.nio.ByteBuffer; @@ -98,6 +99,8 @@ public class SketchObjectStrategy implements ObjectStrategy byte[] retVal = new byte[(int) mem.getCapacity()]; mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); return retVal; + } else if (obj instanceof Union) { + return toBytes(((Union) obj).getResult(true, null)); } else if (obj == null) { return EMPTY_BYTES; } else { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index a57e87f0833..d96abcad4d1 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -26,6 +26,7 @@ import com.metamx.common.IAE; import com.metamx.common.logger.Logger; import com.yahoo.sketches.Util; import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Union; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; @@ -83,12 +84,23 @@ public class SketchSetPostAggregator implements PostAggregator { Sketch[] sketches = new Sketch[fields.size()]; for (int i = 0; i < sketches.length; i++) { - sketches[i] = (Sketch) fields.get(i).compute(combinedAggregators); + sketches[i] = toSketch(fields.get(i).compute(combinedAggregators)); } return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches); } + public final static Sketch toSketch(Object obj) + { + if (obj instanceof Sketch) { + return (Sketch) obj; + } else if (obj instanceof Union) { + return ((Union) obj).getResult(true, null); + } else { + throw new IAE("Can't convert to Sketch object [%s]", obj.getClass()); + } + } + @Override @JsonProperty public String getName() diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java new file mode 100644 index 00000000000..865d268ec5b --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java @@ -0,0 +1,40 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation.datasketches.theta; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.yahoo.sketches.theta.Union; + +import java.io.IOException; + +/** + */ +public class UnionJsonSerializer extends JsonSerializer +{ + @Override + public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException + { + jgen.writeBinary(union.getResult(true, null).toByteArray()); + } +}