diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchAggregator.java index c5c70706119..be197352ca9 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchAggregator.java @@ -40,7 +40,7 @@ public class EmptySketchAggregator implements Aggregator @Override public Object get() { - return SketchOperations.EMPTY_SKETCH; + return SketchHolder.EMPTY; } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java index 5073c1b79b0..263f24c201e 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/EmptySketchBufferAggregator.java @@ -42,7 +42,7 @@ public class EmptySketchBufferAggregator implements BufferAggregator @Override public Object get(ByteBuffer buf, int position) { - return SketchOperations.EMPTY_SKETCH; + return SketchHolder.EMPTY; } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java deleted file mode 100644 index 2590c1fc3fb..00000000000 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/MemoryJsonSerializer.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query.aggregation.datasketches.theta; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.yahoo.sketches.memory.Memory; - -import java.io.IOException; - -/** - */ -public class MemoryJsonSerializer extends JsonSerializer -{ - @Override - public void serialize(Memory mem, JsonGenerator jgen, SerializerProvider provider) - throws IOException, JsonProcessingException - { - jgen.writeBinary(SketchOperations.deserializeFromMemory(mem).toByteArray()); - } -} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java index d49019f34ee..a7944df19dd 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregator.java @@ -20,13 +20,9 @@ package io.druid.query.aggregation.datasketches.theta; import com.yahoo.sketches.Family; -import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.theta.SetOperation; -import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Union; - import io.druid.java.util.common.ISE; -import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.Aggregator; import io.druid.segment.ObjectColumnSelector; @@ -34,8 +30,6 @@ import java.util.List; public class SketchAggregator implements Aggregator { - private static final Logger logger = new Logger(SketchAggregator.class); - private final ObjectColumnSelector selector; private Union union; @@ -71,7 +65,7 @@ public class SketchAggregator implements Aggregator //however, advantage of ordered sketch is that they are faster to "union" later //given that results from the aggregator will be combined further, it is better //to return the ordered sketch here - return union.getResult(true, null); + return SketchHolder.of(union.getResult(true, null)); } @Override @@ -100,12 +94,8 @@ public class SketchAggregator implements Aggregator static void updateUnion(Union union, Object update) { - if (update instanceof Memory) { - union.update((Memory) update); - } else if (update instanceof Sketch) { - union.update((Sketch) update); - } else if (update instanceof Union) { - union.update(((Union) update).getResult(false, null)); + if (update instanceof SketchHolder) { + ((SketchHolder) update).updateUnion(union); } else if (update instanceof String) { union.update((String) update); } else if (update instanceof byte[]) { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index dd288932a4d..6419b390b76 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -21,16 +21,9 @@ package io.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.Ordering; -import com.google.common.primitives.Doubles; import com.google.common.primitives.Ints; -import com.yahoo.sketches.Family; import com.yahoo.sketches.Util; -import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.theta.SetOperation; -import com.yahoo.sketches.theta.Sketch; -import com.yahoo.sketches.theta.Union; -import io.druid.java.util.common.IAE; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.BufferAggregator; @@ -51,19 +44,6 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory protected final int size; private final byte cacheId; - public static final Comparator COMPARATOR = Ordering.from( - new Comparator() - { - @Override - public int compare(Object o1, Object o2) - { - Sketch s1 = SketchAggregatorFactory.toSketch(o1); - Sketch s2 = SketchAggregatorFactory.toSketch(o2); - return Doubles.compare(s1.getEstimate(), s2.getEstimate()); - } - } - ).nullsFirst(); - public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId) { this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name"); @@ -102,48 +82,19 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory @Override public Object deserialize(Object object) { - return SketchOperations.deserialize(object); + return SketchHolder.deserialize(object); } @Override public Comparator getComparator() { - return COMPARATOR; + return SketchHolder.COMPARATOR; } @Override public Object combine(Object lhs, Object rhs) { - final Union union; - if (lhs instanceof Union) { - union = (Union) lhs; - updateUnion(union, rhs); - } else if (rhs instanceof Union) { - union = (Union) rhs; - updateUnion(union, lhs); - } else { - union = (Union) SetOperation.builder().build(size, Family.UNION); - updateUnion(union, lhs); - updateUnion(union, rhs); - } - - - return union; - } - - private void updateUnion(Union union, Object obj) - { - if (obj == null) { - return; - } else if (obj instanceof Memory) { - union.update((Memory) obj); - } else if (obj instanceof Sketch) { - union.update((Sketch) obj); - } else if (obj instanceof Union) { - union.update(((Union) obj).getResult(false, null)); - } else { - throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName()); - } + return SketchHolder.combine(lhs, rhs, size); } @Override @@ -188,17 +139,6 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory .array(); } - public final static Sketch toSketch(Object obj) - { - if (obj instanceof Sketch) { - return (Sketch) obj; - } else if (obj instanceof Union) { - return ((Union) obj).getResult(true, null); - } else { - throw new IAE("Can't convert to Sketch object [%s]", obj.getClass()); - } - } - @Override public String toString() { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java index 04c8b319d02..7f8f6000cbf 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java @@ -25,8 +25,6 @@ import com.yahoo.sketches.memory.MemoryRegion; import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.theta.SetOperation; import com.yahoo.sketches.theta.Union; - -import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.BufferAggregator; import io.druid.segment.ObjectColumnSelector; @@ -36,8 +34,6 @@ import java.util.Map; public class SketchBufferAggregator implements BufferAggregator { - private static final Logger logger = new Logger(SketchAggregator.class); - private final ObjectColumnSelector selector; private final int size; private final int maxIntermediateSize; @@ -84,7 +80,7 @@ public class SketchBufferAggregator implements BufferAggregator //however, advantage of ordered sketch is that they are faster to "union" later //given that results from the aggregator will be combined further, it is better //to return the ordered sketch here - return getUnion(buf, position).getResult(true, null); + return SketchHolder.of(getUnion(buf, position).getResult(true, null)); } //Note that this is not threadsafe and I don't think it needs to be diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java index 42e852a0a20..373f7a30cd4 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchEstimatePostAggregator.java @@ -25,13 +25,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.primitives.Doubles; -import com.yahoo.sketches.theta.Sketch; import io.druid.query.aggregation.PostAggregator; import java.util.Comparator; import java.util.Map; import java.util.Set; - public class SketchEstimatePostAggregator implements PostAggregator { @@ -82,16 +80,11 @@ public class SketchEstimatePostAggregator implements PostAggregator @Override public Object compute(Map combinedAggregators) { - Sketch sketch = SketchAggregatorFactory.toSketch(field.compute(combinedAggregators)); + SketchHolder holder = (SketchHolder)field.compute(combinedAggregators); if (errorBoundsStdDev != null) { - SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( - sketch.getEstimate(), - sketch.getUpperBound(errorBoundsStdDev), - sketch.getLowerBound(errorBoundsStdDev), - errorBoundsStdDev); - return result; + return holder.getEstimateWithErrorBounds(errorBoundsStdDev); } else { - return sketch.getEstimate(); + return holder.getEstimate(); } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java new file mode 100644 index 00000000000..7752c3c2071 --- /dev/null +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolder.java @@ -0,0 +1,293 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation.datasketches.theta; + +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Doubles; +import com.google.common.primitives.Longs; +import com.yahoo.sketches.Family; +import com.yahoo.sketches.memory.Memory; +import com.yahoo.sketches.memory.NativeMemory; +import com.yahoo.sketches.theta.AnotB; +import com.yahoo.sketches.theta.Intersection; +import com.yahoo.sketches.theta.SetOperation; +import com.yahoo.sketches.theta.Sketch; +import com.yahoo.sketches.theta.Sketches; +import com.yahoo.sketches.theta.Union; +import io.druid.java.util.common.IAE; +import io.druid.java.util.common.ISE; +import org.apache.commons.codec.binary.Base64; + +import java.util.Comparator; + +/** + */ +public class SketchHolder +{ + public static final SketchHolder EMPTY = SketchHolder.of( + Sketches.updateSketchBuilder() + .build() + .compact(true, null) + ); + + public static final Comparator COMPARATOR = Ordering.from( + new Comparator() + { + @Override + public int compare(Object o1, Object o2) + { + SketchHolder h1 = (SketchHolder) o1; + SketchHolder h2 = (SketchHolder) o2; + + if (h1.obj instanceof Sketch || h1.obj instanceof Union) { + if (h2.obj instanceof Sketch || h2.obj instanceof Union) { + return SKETCH_COMPARATOR.compare(h1.getSketch(), h2.getSketch()); + } else { + return -1; + } + } + + if (h1.obj instanceof Memory) { + if (h2.obj instanceof Memory) { + return MEMORY_COMPARATOR.compare((Memory) h1.obj, (Memory) h2.obj); + } else { + return 1; + } + } + + throw new IAE("Unknwon types [%s] and [%s]", h1.obj.getClass().getName(), h2.obj.getClass().getName()); + } + } + ).nullsFirst(); + + private static final Comparator SKETCH_COMPARATOR = new Comparator() + { + @Override + public int compare(Sketch o1, Sketch o2) + { + return Doubles.compare(o1.getEstimate(), o2.getEstimate()); + } + }; + + private static final Comparator MEMORY_COMPARATOR = new Comparator() + { + @Override + public int compare(Memory o1, Memory o2) + { + // We have two Ordered Compact sketches, so just compare their last entry if they have the size. + // This is to produce a deterministic ordering, though it might not match the actual estimate + // ordering, but that's ok because this comparator is only used by GenericIndexed + int retVal = Longs.compare(o1.getCapacity(), o2.getCapacity()); + if (retVal == 0) { + retVal = Longs.compare(o1.getLong(o2.getCapacity() - 8), o2.getLong(o2.getCapacity() - 8)); + } + + return retVal; + } + }; + + + private final Object obj; + + private volatile Double cachedEstimate = null; + private volatile Sketch cachedSketch = null; + + private SketchHolder(Object obj) + { + Preconditions.checkArgument( + obj instanceof Sketch || obj instanceof Union || obj instanceof Memory, + "unknown sketch representation type [%s]", obj.getClass().getName() + ); + this.obj = obj; + } + + public static SketchHolder of(Object obj) + { + return new SketchHolder(obj); + } + + public void updateUnion(Union union) + { + if (obj instanceof Memory) { + union.update((Memory) obj); + } else { + union.update(getSketch()); + } + } + + public Sketch getSketch() + { + if (cachedSketch != null) { + return cachedSketch; + } + + if (obj instanceof Sketch) { + cachedSketch = (Sketch) obj; + } else if (obj instanceof Union) { + cachedSketch = ((Union) obj).getResult(); + } else if (obj instanceof Memory) { + cachedSketch = deserializeFromMemory((Memory) obj); + } else { + throw new ISE("Can't get sketch from object of type [%s]", obj.getClass().getName()); + } + return cachedSketch; + } + + public double getEstimate() + { + if (cachedEstimate == null) { + cachedEstimate = getSketch().getEstimate(); + } + return cachedEstimate.doubleValue(); + } + + public SketchEstimateWithErrorBounds getEstimateWithErrorBounds(int errorBoundsStdDev) + { + Sketch sketch = getSketch(); + SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( + getEstimate(), + sketch.getUpperBound(errorBoundsStdDev), + sketch.getLowerBound(errorBoundsStdDev), + errorBoundsStdDev); + return result; + } + + public static Object combine(Object o1, Object o2, int nomEntries) + { + SketchHolder holder1 = (SketchHolder) o1; + SketchHolder holder2 = (SketchHolder) o2; + + if (holder1.obj instanceof Union) { + Union union = (Union) holder1.obj; + holder2.updateUnion(union); + holder1.invalidateCache(); + return holder1; + } else if (holder2.obj instanceof Union) { + Union union = (Union) holder2.obj; + holder1.updateUnion(union); + holder2.invalidateCache(); + return holder2; + } else { + Union union = (Union) SetOperation.builder().build(nomEntries, Family.UNION); + holder1.updateUnion(union); + holder2.updateUnion(union); + return SketchHolder.of(union); + } + } + + private void invalidateCache() + { + cachedEstimate = null; + cachedSketch = null; + } + + public static SketchHolder deserialize(Object serializedSketch) + { + if (serializedSketch instanceof String) { + return SketchHolder.of(deserializeFromBase64EncodedString((String) serializedSketch)); + } else if (serializedSketch instanceof byte[]) { + return SketchHolder.of(deserializeFromByteArray((byte[]) serializedSketch)); + } else if (serializedSketch instanceof SketchHolder) { + return (SketchHolder) serializedSketch; + } else if (serializedSketch instanceof Sketch + || serializedSketch instanceof Union + || serializedSketch instanceof Memory) { + return SketchHolder.of(serializedSketch); + } + + throw new ISE( + "Object is not of a type[%s] that can be deserialized to sketch.", + serializedSketch.getClass() + ); + } + + private static Sketch deserializeFromBase64EncodedString(String str) + { + return deserializeFromByteArray( + Base64.decodeBase64( + str.getBytes(Charsets.UTF_8) + ) + ); + } + + private static Sketch deserializeFromByteArray(byte[] data) + { + return deserializeFromMemory(new NativeMemory(data)); + } + + private static Sketch deserializeFromMemory(Memory mem) + { + if (Sketch.getSerializationVersion(mem) < 3) { + return Sketches.heapifySketch(mem); + } else { + return Sketches.wrapSketch(mem); + } + } + + public static enum Func + { + UNION, + INTERSECT, + NOT; + } + + public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders) + { + //in the code below, I am returning SetOp.getResult(false, null) + //"false" gets us an unordered sketch which is faster to build + //"true" returns an ordered sketch but slower to compute. advantage of ordered sketch + //is that they are faster to "union" later but given that this method is used in + //the final stages of query processing, ordered sketch would be of no use. + switch (func) { + case UNION: + Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION); + for (Object o : holders) { + ((SketchHolder) o).updateUnion(union); + } + return SketchHolder.of(union); + case INTERSECT: + Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION); + for (Object o : holders) { + intersection.update(((SketchHolder) o).getSketch()); + } + return SketchHolder.of(intersection.getResult(false, null)); + case NOT: + if (holders.length < 1) { + throw new IllegalArgumentException("A-Not-B requires atleast 1 sketch"); + } + + if (holders.length == 1) { + return (SketchHolder) holders[0]; + } + + Sketch result = ((SketchHolder) holders[0]).getSketch(); + for (int i = 1; i < holders.length; i++) { + AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B); + anotb.update(result, ((SketchHolder) holders[i]).getSketch()); + result = anotb.getResult(false, null); + } + return SketchHolder.of(result); + default: + throw new IllegalArgumentException("Unknown sketch operation " + func); + } + } +} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolderJsonSerializer.java similarity index 75% rename from extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchJsonSerializer.java rename to extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolderJsonSerializer.java index 49fb58a6417..0e3d673b062 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchJsonSerializer.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchHolderJsonSerializer.java @@ -20,19 +20,17 @@ package io.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.SerializerProvider; -import com.yahoo.sketches.theta.Sketch; import java.io.IOException; -public class SketchJsonSerializer extends JsonSerializer +public class SketchHolderJsonSerializer extends JsonSerializer { @Override - public void serialize(Sketch sketch, JsonGenerator jgen, SerializerProvider provider) - throws IOException, JsonProcessingException + public void serialize(SketchHolder sketchHolder, JsonGenerator jgen, SerializerProvider provider) + throws IOException { - jgen.writeBinary(sketch.toByteArray()); + jgen.writeBinary(sketchHolder.getSketch().toByteArray()); } } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index c128958c101..b2514f754a8 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -21,7 +21,6 @@ package io.druid.query.aggregation.datasketches.theta; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.yahoo.sketches.theta.Sketch; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.AggregatorFactoryNotMergeableException; @@ -123,16 +122,11 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory public Object finalizeComputation(Object object) { if (shouldFinalize) { - Sketch sketch = SketchAggregatorFactory.toSketch(object); + SketchHolder holder = (SketchHolder) object; if (errorBoundsStdDev != null) { - SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( - sketch.getEstimate(), - sketch.getUpperBound(errorBoundsStdDev), - sketch.getLowerBound(errorBoundsStdDev), - errorBoundsStdDev); - return result; + return holder.getEstimateWithErrorBounds(errorBoundsStdDev); } else { - return sketch.getEstimate(); + return holder.getEstimate(); } } else { return object; diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeComplexMetricSerde.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeComplexMetricSerde.java index cbb8935fad5..45ddd390dc1 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeComplexMetricSerde.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchMergeComplexMetricSerde.java @@ -19,7 +19,6 @@ package io.druid.query.aggregation.datasketches.theta; -import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.theta.Sketch; import io.druid.data.input.InputRow; import io.druid.segment.column.ColumnBuilder; @@ -49,17 +48,17 @@ public class SketchMergeComplexMetricSerde extends ComplexMetricSerde @Override public Class extractedClass() { - return Sketch.class; + return Object.class; } @Override public Object extractValue(InputRow inputRow, String metricName) { final Object object = inputRow.getRaw(metricName); - if (object == null || object instanceof Sketch || object instanceof Memory) { + if (object == null) { return object; } - return SketchOperations.deserialize(object); + return SketchHolder.deserialize(object); } }; } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java index 4daea218a59..361dd4a2fbf 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchModule.java @@ -23,9 +23,6 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; -import com.yahoo.sketches.memory.Memory; -import com.yahoo.sketches.theta.Sketch; -import com.yahoo.sketches.theta.Union; import io.druid.initialization.DruidModule; import io.druid.segment.serde.ComplexMetrics; @@ -69,13 +66,7 @@ public class SketchModule implements DruidModule new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG) ) .addSerializer( - Sketch.class, new SketchJsonSerializer() - ) - .addSerializer( - Memory.class, new MemoryJsonSerializer() - ) - .addSerializer( - Union.class, new UnionJsonSerializer() + SketchHolder.class, new SketchHolderJsonSerializer() ) ); } diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java index 15932887af5..8e7b1f04638 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchObjectStrategy.java @@ -19,13 +19,9 @@ package io.druid.query.aggregation.datasketches.theta; -import com.google.common.primitives.Longs; -import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.memory.MemoryRegion; import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.theta.Sketch; -import com.yahoo.sketches.theta.Sketches; -import com.yahoo.sketches.theta.Union; import io.druid.java.util.common.IAE; import io.druid.segment.data.ObjectStrategy; @@ -35,74 +31,38 @@ public class SketchObjectStrategy implements ObjectStrategy { private static final byte[] EMPTY_BYTES = new byte[]{}; - private static final Sketch EMPTY_SKETCH = Sketches.updateSketchBuilder().build().compact(true, null); @Override public int compare(Object s1, Object s2) { - if (s1 instanceof Sketch || s1 instanceof Union) { - if (s2 instanceof Sketch || s2 instanceof Union) { - return SketchAggregatorFactory.COMPARATOR.compare(s1, s2); - } else { - return -1; - } - } - - if (s1 instanceof Memory) { - if (s2 instanceof Memory) { - Memory s1Mem = (Memory) s1; - Memory s2Mem = (Memory) s2; - - // We have two Ordered Compact sketches, so just compare their last entry if they have the size. - // This is to produce a deterministic ordering, though it might not match the actual estimate - // ordering, but that's ok because this comparator is only used by GenericIndexed - int retVal = Longs.compare(s1Mem.getCapacity(), s2Mem.getCapacity()); - if (retVal == 0) { - retVal = Longs.compare(s1Mem.getLong(s1Mem.getCapacity() - 8), s2Mem.getLong(s2Mem.getCapacity() - 8)); - } - - return retVal; - } else { - return 1; - } - } - - throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1); - + return SketchHolder.COMPARATOR.compare(s1, s2); } @Override - public Class getClazz() + public Class getClazz() { - return Sketch.class; + return Object.class; } @Override public Object fromByteBuffer(ByteBuffer buffer, int numBytes) { if (numBytes == 0) { - return EMPTY_SKETCH; + return SketchHolder.EMPTY; } - return new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes); + return SketchHolder.of(new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes)); } @Override public byte[] toBytes(Object obj) { - if (obj instanceof Sketch) { - Sketch sketch = (Sketch) obj; + if (obj instanceof SketchHolder) { + Sketch sketch = ((SketchHolder) obj).getSketch(); if (sketch.isEmpty()) { return EMPTY_BYTES; } return sketch.toByteArray(); - } else if (obj instanceof Memory) { - Memory mem = (Memory) obj; - byte[] retVal = new byte[(int) mem.getCapacity()]; - mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); - return retVal; - } else if (obj instanceof Union) { - return toBytes(((Union) obj).getResult(true, null)); } else if (obj == null) { return EMPTY_BYTES; } else { diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java deleted file mode 100644 index 095d87c9713..00000000000 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchOperations.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.query.aggregation.datasketches.theta; - -import com.google.common.base.Charsets; -import com.yahoo.sketches.Family; -import com.yahoo.sketches.memory.Memory; -import com.yahoo.sketches.memory.NativeMemory; -import com.yahoo.sketches.theta.AnotB; -import com.yahoo.sketches.theta.Intersection; -import com.yahoo.sketches.theta.SetOperation; -import com.yahoo.sketches.theta.Sketch; -import com.yahoo.sketches.theta.Sketches; -import com.yahoo.sketches.theta.Union; - -import io.druid.java.util.common.logger.Logger; - -import org.apache.commons.codec.binary.Base64; - -public class SketchOperations -{ - - private static final Logger LOG = new Logger(SketchOperations.class); - - public static final Sketch EMPTY_SKETCH = Sketches.updateSketchBuilder().build().compact(true, null); - - public static enum Func - { - UNION, - INTERSECT, - NOT; - } - - public static Sketch deserialize(Object serializedSketch) - { - if (serializedSketch instanceof String) { - return deserializeFromBase64EncodedString((String) serializedSketch); - } else if (serializedSketch instanceof byte[]) { - return deserializeFromByteArray((byte[]) serializedSketch); - } else if (serializedSketch instanceof Sketch) { - return (Sketch) serializedSketch; - } else if (serializedSketch instanceof Union) { - return ((Union) serializedSketch).getResult(true, null); - } - - throw new IllegalStateException( - "Object is not of a type that can deserialize to sketch: " - + serializedSketch.getClass() - ); - } - - public static Sketch deserializeFromBase64EncodedString(String str) - { - return deserializeFromByteArray( - Base64.decodeBase64( - str.getBytes(Charsets.UTF_8) - ) - ); - } - - public static Sketch deserializeFromByteArray(byte[] data) - { - return deserializeFromMemory(new NativeMemory(data)); - } - - public static Sketch deserializeFromMemory(Memory mem) - { - if (Sketch.getSerializationVersion(mem) < 3) { - return Sketches.heapifySketch(mem); - } else { - return Sketches.wrapSketch(mem); - } - } - - public static Sketch sketchSetOperation(Func func, int sketchSize, Sketch... sketches) - { - //in the code below, I am returning SetOp.getResult(false, null) - //"false" gets us an unordered sketch which is faster to build - //"true" returns an ordered sketch but slower to compute. advantage of ordered sketch - //is that they are faster to "union" later but given that this method is used in - //the final stages of query processing, ordered sketch would be of no use. - switch (func) { - case UNION: - Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION); - for (Sketch sketch : sketches) { - union.update(sketch); - } - return union.getResult(false, null); - case INTERSECT: - Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION); - for (Sketch sketch : sketches) { - intersection.update(sketch); - } - return intersection.getResult(false, null); - case NOT: - if (sketches.length < 1) { - throw new IllegalArgumentException("A-Not-B requires atleast 1 sketch"); - } - - if (sketches.length == 1) { - return sketches[0]; - } - - Sketch result = sketches[0]; - for (int i = 1; i < sketches.length; i++) { - AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B); - anotb.update(result, sketches[i]); - result = anotb.getResult(false, null); - } - return result; - default: - throw new IllegalArgumentException("Unknown sketch operation " + func); - } - } -} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java index a4f023cd431..40849ce37a4 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/SketchSetPostAggregator.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; import com.yahoo.sketches.Util; -import com.yahoo.sketches.theta.Sketch; import io.druid.java.util.common.IAE; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.PostAggregator; @@ -40,7 +39,7 @@ public class SketchSetPostAggregator implements PostAggregator private final String name; private final List fields; - private final SketchOperations.Func func; + private final SketchHolder.Func func; private final int maxSketchSize; @JsonCreator @@ -53,7 +52,7 @@ public class SketchSetPostAggregator implements PostAggregator { this.name = name; this.fields = fields; - this.func = SketchOperations.Func.valueOf(func); + this.func = SketchHolder.Func.valueOf(func); this.maxSketchSize = maxSize == null ? SketchAggregatorFactory.DEFAULT_MAX_SKETCH_SIZE : maxSize; Util.checkIfPowerOf2(this.maxSketchSize, "size"); @@ -75,18 +74,18 @@ public class SketchSetPostAggregator implements PostAggregator @Override public Comparator getComparator() { - return SketchAggregatorFactory.COMPARATOR; + return SketchHolder.COMPARATOR; } @Override public Object compute(final Map combinedAggregators) { - Sketch[] sketches = new Sketch[fields.size()]; + Object[] sketches = new Object[fields.size()]; for (int i = 0; i < sketches.length; i++) { - sketches[i] = SketchAggregatorFactory.toSketch(fields.get(i).compute(combinedAggregators)); + sketches[i] = fields.get(i).compute(combinedAggregators); } - return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches); + return SketchHolder.sketchSetOperation(func, maxSketchSize, sketches); } @Override diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java deleted file mode 100644 index 865d268ec5b..00000000000 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/UnionJsonSerializer.java +++ /dev/null @@ -1,40 +0,0 @@ -/* -* Licensed to Metamarkets Group Inc. (Metamarkets) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. Metamarkets licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package io.druid.query.aggregation.datasketches.theta; - -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.yahoo.sketches.theta.Union; - -import java.io.IOException; - -/** - */ -public class UnionJsonSerializer extends JsonSerializer -{ - @Override - public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider) - throws IOException, JsonProcessingException - { - jgen.writeBinary(union.getResult(true, null).toByteArray()); - } -} diff --git a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchModule.java b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchModule.java index 6b32e60e1d7..3f5144f8d78 100644 --- a/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchModule.java +++ b/extensions-core/datasketches/src/main/java/io/druid/query/aggregation/datasketches/theta/oldapi/OldApiSketchModule.java @@ -23,10 +23,10 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.inject.Binder; -import com.yahoo.sketches.theta.Sketch; import io.druid.initialization.DruidModule; import io.druid.query.aggregation.datasketches.theta.SketchBuildComplexMetricSerde; -import io.druid.query.aggregation.datasketches.theta.SketchJsonSerializer; +import io.druid.query.aggregation.datasketches.theta.SketchHolder; +import io.druid.query.aggregation.datasketches.theta.SketchHolderJsonSerializer; import io.druid.query.aggregation.datasketches.theta.SketchMergeComplexMetricSerde; import io.druid.query.aggregation.datasketches.theta.SketchModule; import io.druid.segment.serde.ComplexMetrics; @@ -80,7 +80,7 @@ public class OldApiSketchModule implements DruidModule new NamedType(OldSketchEstimatePostAggregator.class, "sketchEstimate"), new NamedType(OldSketchSetPostAggregator.class, "sketchSetOper") ) - .addSerializer(Sketch.class, new SketchJsonSerializer()) + .addSerializer(SketchHolder.class, new SketchHolderJsonSerializer()) ); } } diff --git a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java index d6243e47d0b..951e9ee9210 100644 --- a/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java +++ b/extensions-core/datasketches/src/test/java/io/druid/query/aggregation/datasketches/theta/SketchAggregationTest.java @@ -206,7 +206,7 @@ public class SketchAggregationTest @Test public void testSketchMergeFinalization() throws Exception { - Sketch sketch = Sketches.updateSketchBuilder().build(128); + SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().build(128)); SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null); Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001); @@ -344,7 +344,7 @@ public class SketchAggregationTest @Test public void testSketchAggregatorFactoryComparator() { - Comparator comparator = SketchAggregatorFactory.COMPARATOR; + Comparator comparator = SketchHolder.COMPARATOR; Assert.assertEquals(0, comparator.compare(null, null)); Union union1 = (Union) SetOperation.builder().build(1<<4, Family.UNION); @@ -352,8 +352,8 @@ public class SketchAggregationTest union1.update("b"); Sketch sketch1 = union1.getResult(); - Assert.assertEquals(-1, comparator.compare(null, sketch1)); - Assert.assertEquals(1, comparator.compare(sketch1, null)); + Assert.assertEquals(-1, comparator.compare(null, SketchHolder.of(sketch1))); + Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch1), null)); Union union2 = (Union) SetOperation.builder().build(1<<4, Family.UNION); union2.update("a"); @@ -361,12 +361,12 @@ public class SketchAggregationTest union2.update("c"); Sketch sketch2 = union2.getResult(); - Assert.assertEquals(-1, comparator.compare(sketch1, sketch2)); - Assert.assertEquals(-1, comparator.compare(sketch1, union2)); - Assert.assertEquals(1, comparator.compare(sketch2, sketch1)); - Assert.assertEquals(1, comparator.compare(sketch2, union1)); - Assert.assertEquals(1, comparator.compare(union2, union1)); - Assert.assertEquals(1, comparator.compare(union2, sketch1)); + Assert.assertEquals(-1, comparator.compare(SketchHolder.of(sketch1), SketchHolder.of(sketch2))); + Assert.assertEquals(-1, comparator.compare(SketchHolder.of(sketch1), SketchHolder.of(union2))); + Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch2), SketchHolder.of(sketch1))); + Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch2), SketchHolder.of(union1))); + Assert.assertEquals(1, comparator.compare(SketchHolder.of(union2), SketchHolder.of(union1))); + Assert.assertEquals(1, comparator.compare(SketchHolder.of(union2), SketchHolder.of(sketch1))); } private void assertPostAggregatorSerde(PostAggregator agg) throws Exception