consolidate different theta sketch representations into SketchHolder (#3671)

This commit is contained in:
Himanshu 2016-11-11 12:20:41 -06:00 committed by Fangjin Yang
parent 52a74cf84f
commit ddc078926b
18 changed files with 342 additions and 401 deletions

View File

@ -40,7 +40,7 @@ public class EmptySketchAggregator implements Aggregator
@Override
public Object get()
{
return SketchOperations.EMPTY_SKETCH;
return SketchHolder.EMPTY;
}
@Override

View File

@ -42,7 +42,7 @@ public class EmptySketchBufferAggregator implements BufferAggregator
@Override
public Object get(ByteBuffer buf, int position)
{
return SketchOperations.EMPTY_SKETCH;
return SketchHolder.EMPTY;
}
@Override

View File

@ -1,40 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.yahoo.sketches.memory.Memory;
import java.io.IOException;
/**
*/
public class MemoryJsonSerializer extends JsonSerializer<Memory>
{
@Override
public void serialize(Memory mem, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeBinary(SketchOperations.deserializeFromMemory(mem).toByteArray());
}
}

View File

@ -20,13 +20,9 @@
package io.druid.query.aggregation.datasketches.theta;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;
@ -34,8 +30,6 @@ import java.util.List;
public class SketchAggregator implements Aggregator
{
private static final Logger logger = new Logger(SketchAggregator.class);
private final ObjectColumnSelector selector;
private Union union;
@ -71,7 +65,7 @@ public class SketchAggregator implements Aggregator
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return union.getResult(true, null);
return SketchHolder.of(union.getResult(true, null));
}
@Override
@ -100,12 +94,8 @@ public class SketchAggregator implements Aggregator
static void updateUnion(Union union, Object update)
{
if (update instanceof Memory) {
union.update((Memory) update);
} else if (update instanceof Sketch) {
union.update((Sketch) update);
} else if (update instanceof Union) {
union.update(((Union) update).getResult(false, null));
if (update instanceof SketchHolder) {
((SketchHolder) update).updateUnion(union);
} else if (update instanceof String) {
union.update((String) update);
} else if (update instanceof byte[]) {

View File

@ -21,16 +21,9 @@ package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Ints;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
@ -51,19 +44,6 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
protected final int size;
private final byte cacheId;
public static final Comparator<Object> COMPARATOR = Ordering.from(
new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
Sketch s1 = SketchAggregatorFactory.toSketch(o1);
Sketch s2 = SketchAggregatorFactory.toSketch(o2);
return Doubles.compare(s1.getEstimate(), s2.getEstimate());
}
}
).nullsFirst();
public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId)
{
this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
@ -102,48 +82,19 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
@Override
public Object deserialize(Object object)
{
return SketchOperations.deserialize(object);
return SketchHolder.deserialize(object);
}
@Override
public Comparator<Object> getComparator()
{
return COMPARATOR;
return SketchHolder.COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
final Union union;
if (lhs instanceof Union) {
union = (Union) lhs;
updateUnion(union, rhs);
} else if (rhs instanceof Union) {
union = (Union) rhs;
updateUnion(union, lhs);
} else {
union = (Union) SetOperation.builder().build(size, Family.UNION);
updateUnion(union, lhs);
updateUnion(union, rhs);
}
return union;
}
private void updateUnion(Union union, Object obj)
{
if (obj == null) {
return;
} else if (obj instanceof Memory) {
union.update((Memory) obj);
} else if (obj instanceof Sketch) {
union.update((Sketch) obj);
} else if (obj instanceof Union) {
union.update(((Union) obj).getResult(false, null));
} else {
throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName());
}
return SketchHolder.combine(lhs, rhs, size);
}
@Override
@ -188,17 +139,6 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
.array();
}
public final static Sketch toSketch(Object obj)
{
if (obj instanceof Sketch) {
return (Sketch) obj;
} else if (obj instanceof Union) {
return ((Union) obj).getResult(true, null);
} else {
throw new IAE("Can't convert to Sketch object [%s]", obj.getClass());
}
}
@Override
public String toString()
{

View File

@ -25,8 +25,6 @@ import com.yahoo.sketches.memory.MemoryRegion;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;
@ -36,8 +34,6 @@ import java.util.Map;
public class SketchBufferAggregator implements BufferAggregator
{
private static final Logger logger = new Logger(SketchAggregator.class);
private final ObjectColumnSelector selector;
private final int size;
private final int maxIntermediateSize;
@ -84,7 +80,7 @@ public class SketchBufferAggregator implements BufferAggregator
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return getUnion(buf, position).getResult(true, null);
return SketchHolder.of(getUnion(buf, position).getResult(true, null));
}
//Note that this is not threadsafe and I don't think it needs to be

View File

@ -25,13 +25,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.primitives.Doubles;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.PostAggregator;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
public class SketchEstimatePostAggregator implements PostAggregator
{
@ -82,16 +80,11 @@ public class SketchEstimatePostAggregator implements PostAggregator
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
Sketch sketch = SketchAggregatorFactory.toSketch(field.compute(combinedAggregators));
SketchHolder holder = (SketchHolder)field.compute(combinedAggregators);
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
errorBoundsStdDev);
return result;
return holder.getEstimateWithErrorBounds(errorBoundsStdDev);
} else {
return sketch.getEstimate();
return holder.getEstimate();
}
}

View File

@ -0,0 +1,293 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Longs;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.AnotB;
import com.yahoo.sketches.theta.Intersection;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import org.apache.commons.codec.binary.Base64;
import java.util.Comparator;
/**
*/
public class SketchHolder
{
public static final SketchHolder EMPTY = SketchHolder.of(
Sketches.updateSketchBuilder()
.build()
.compact(true, null)
);
public static final Comparator<Object> COMPARATOR = Ordering.from(
new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
SketchHolder h1 = (SketchHolder) o1;
SketchHolder h2 = (SketchHolder) o2;
if (h1.obj instanceof Sketch || h1.obj instanceof Union) {
if (h2.obj instanceof Sketch || h2.obj instanceof Union) {
return SKETCH_COMPARATOR.compare(h1.getSketch(), h2.getSketch());
} else {
return -1;
}
}
if (h1.obj instanceof Memory) {
if (h2.obj instanceof Memory) {
return MEMORY_COMPARATOR.compare((Memory) h1.obj, (Memory) h2.obj);
} else {
return 1;
}
}
throw new IAE("Unknwon types [%s] and [%s]", h1.obj.getClass().getName(), h2.obj.getClass().getName());
}
}
).nullsFirst();
private static final Comparator<Sketch> SKETCH_COMPARATOR = new Comparator<Sketch>()
{
@Override
public int compare(Sketch o1, Sketch o2)
{
return Doubles.compare(o1.getEstimate(), o2.getEstimate());
}
};
private static final Comparator<Memory> MEMORY_COMPARATOR = new Comparator<Memory>()
{
@Override
public int compare(Memory o1, Memory o2)
{
// We have two Ordered Compact sketches, so just compare their last entry if they have the size.
// This is to produce a deterministic ordering, though it might not match the actual estimate
// ordering, but that's ok because this comparator is only used by GenericIndexed
int retVal = Longs.compare(o1.getCapacity(), o2.getCapacity());
if (retVal == 0) {
retVal = Longs.compare(o1.getLong(o2.getCapacity() - 8), o2.getLong(o2.getCapacity() - 8));
}
return retVal;
}
};
private final Object obj;
private volatile Double cachedEstimate = null;
private volatile Sketch cachedSketch = null;
private SketchHolder(Object obj)
{
Preconditions.checkArgument(
obj instanceof Sketch || obj instanceof Union || obj instanceof Memory,
"unknown sketch representation type [%s]", obj.getClass().getName()
);
this.obj = obj;
}
public static SketchHolder of(Object obj)
{
return new SketchHolder(obj);
}
public void updateUnion(Union union)
{
if (obj instanceof Memory) {
union.update((Memory) obj);
} else {
union.update(getSketch());
}
}
public Sketch getSketch()
{
if (cachedSketch != null) {
return cachedSketch;
}
if (obj instanceof Sketch) {
cachedSketch = (Sketch) obj;
} else if (obj instanceof Union) {
cachedSketch = ((Union) obj).getResult();
} else if (obj instanceof Memory) {
cachedSketch = deserializeFromMemory((Memory) obj);
} else {
throw new ISE("Can't get sketch from object of type [%s]", obj.getClass().getName());
}
return cachedSketch;
}
public double getEstimate()
{
if (cachedEstimate == null) {
cachedEstimate = getSketch().getEstimate();
}
return cachedEstimate.doubleValue();
}
public SketchEstimateWithErrorBounds getEstimateWithErrorBounds(int errorBoundsStdDev)
{
Sketch sketch = getSketch();
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
errorBoundsStdDev);
return result;
}
public static Object combine(Object o1, Object o2, int nomEntries)
{
SketchHolder holder1 = (SketchHolder) o1;
SketchHolder holder2 = (SketchHolder) o2;
if (holder1.obj instanceof Union) {
Union union = (Union) holder1.obj;
holder2.updateUnion(union);
holder1.invalidateCache();
return holder1;
} else if (holder2.obj instanceof Union) {
Union union = (Union) holder2.obj;
holder1.updateUnion(union);
holder2.invalidateCache();
return holder2;
} else {
Union union = (Union) SetOperation.builder().build(nomEntries, Family.UNION);
holder1.updateUnion(union);
holder2.updateUnion(union);
return SketchHolder.of(union);
}
}
private void invalidateCache()
{
cachedEstimate = null;
cachedSketch = null;
}
public static SketchHolder deserialize(Object serializedSketch)
{
if (serializedSketch instanceof String) {
return SketchHolder.of(deserializeFromBase64EncodedString((String) serializedSketch));
} else if (serializedSketch instanceof byte[]) {
return SketchHolder.of(deserializeFromByteArray((byte[]) serializedSketch));
} else if (serializedSketch instanceof SketchHolder) {
return (SketchHolder) serializedSketch;
} else if (serializedSketch instanceof Sketch
|| serializedSketch instanceof Union
|| serializedSketch instanceof Memory) {
return SketchHolder.of(serializedSketch);
}
throw new ISE(
"Object is not of a type[%s] that can be deserialized to sketch.",
serializedSketch.getClass()
);
}
private static Sketch deserializeFromBase64EncodedString(String str)
{
return deserializeFromByteArray(
Base64.decodeBase64(
str.getBytes(Charsets.UTF_8)
)
);
}
private static Sketch deserializeFromByteArray(byte[] data)
{
return deserializeFromMemory(new NativeMemory(data));
}
private static Sketch deserializeFromMemory(Memory mem)
{
if (Sketch.getSerializationVersion(mem) < 3) {
return Sketches.heapifySketch(mem);
} else {
return Sketches.wrapSketch(mem);
}
}
public static enum Func
{
UNION,
INTERSECT,
NOT;
}
public static SketchHolder sketchSetOperation(Func func, int sketchSize, Object... holders)
{
//in the code below, I am returning SetOp.getResult(false, null)
//"false" gets us an unordered sketch which is faster to build
//"true" returns an ordered sketch but slower to compute. advantage of ordered sketch
//is that they are faster to "union" later but given that this method is used in
//the final stages of query processing, ordered sketch would be of no use.
switch (func) {
case UNION:
Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION);
for (Object o : holders) {
((SketchHolder) o).updateUnion(union);
}
return SketchHolder.of(union);
case INTERSECT:
Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION);
for (Object o : holders) {
intersection.update(((SketchHolder) o).getSketch());
}
return SketchHolder.of(intersection.getResult(false, null));
case NOT:
if (holders.length < 1) {
throw new IllegalArgumentException("A-Not-B requires atleast 1 sketch");
}
if (holders.length == 1) {
return (SketchHolder) holders[0];
}
Sketch result = ((SketchHolder) holders[0]).getSketch();
for (int i = 1; i < holders.length; i++) {
AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B);
anotb.update(result, ((SketchHolder) holders[i]).getSketch());
result = anotb.getResult(false, null);
}
return SketchHolder.of(result);
default:
throw new IllegalArgumentException("Unknown sketch operation " + func);
}
}
}

View File

@ -20,19 +20,17 @@
package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.yahoo.sketches.theta.Sketch;
import java.io.IOException;
public class SketchJsonSerializer extends JsonSerializer<Sketch>
public class SketchHolderJsonSerializer extends JsonSerializer<SketchHolder>
{
@Override
public void serialize(Sketch sketch, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
public void serialize(SketchHolder sketchHolder, JsonGenerator jgen, SerializerProvider provider)
throws IOException
{
jgen.writeBinary(sketch.toByteArray());
jgen.writeBinary(sketchHolder.getSketch().toByteArray());
}
}

View File

@ -21,7 +21,6 @@ package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
@ -123,16 +122,11 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
public Object finalizeComputation(Object object)
{
if (shouldFinalize) {
Sketch sketch = SketchAggregatorFactory.toSketch(object);
SketchHolder holder = (SketchHolder) object;
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
errorBoundsStdDev);
return result;
return holder.getEstimateWithErrorBounds(errorBoundsStdDev);
} else {
return sketch.getEstimate();
return holder.getEstimate();
}
} else {
return object;

View File

@ -19,7 +19,6 @@
package io.druid.query.aggregation.datasketches.theta;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.Sketch;
import io.druid.data.input.InputRow;
import io.druid.segment.column.ColumnBuilder;
@ -49,17 +48,17 @@ public class SketchMergeComplexMetricSerde extends ComplexMetricSerde
@Override
public Class<?> extractedClass()
{
return Sketch.class;
return Object.class;
}
@Override
public Object extractValue(InputRow inputRow, String metricName)
{
final Object object = inputRow.getRaw(metricName);
if (object == null || object instanceof Sketch || object instanceof Memory) {
if (object == null) {
return object;
}
return SketchOperations.deserialize(object);
return SketchHolder.deserialize(object);
}
};
}

View File

@ -23,9 +23,6 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.initialization.DruidModule;
import io.druid.segment.serde.ComplexMetrics;
@ -69,13 +66,7 @@ public class SketchModule implements DruidModule
new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG)
)
.addSerializer(
Sketch.class, new SketchJsonSerializer()
)
.addSerializer(
Memory.class, new MemoryJsonSerializer()
)
.addSerializer(
Union.class, new UnionJsonSerializer()
SketchHolder.class, new SketchHolderJsonSerializer()
)
);
}

View File

@ -19,13 +19,9 @@
package io.druid.query.aggregation.datasketches.theta;
import com.google.common.primitives.Longs;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.memory.MemoryRegion;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.segment.data.ObjectStrategy;
@ -35,74 +31,38 @@ public class SketchObjectStrategy implements ObjectStrategy
{
private static final byte[] EMPTY_BYTES = new byte[]{};
private static final Sketch EMPTY_SKETCH = Sketches.updateSketchBuilder().build().compact(true, null);
@Override
public int compare(Object s1, Object s2)
{
if (s1 instanceof Sketch || s1 instanceof Union) {
if (s2 instanceof Sketch || s2 instanceof Union) {
return SketchAggregatorFactory.COMPARATOR.compare(s1, s2);
} else {
return -1;
}
}
if (s1 instanceof Memory) {
if (s2 instanceof Memory) {
Memory s1Mem = (Memory) s1;
Memory s2Mem = (Memory) s2;
// We have two Ordered Compact sketches, so just compare their last entry if they have the size.
// This is to produce a deterministic ordering, though it might not match the actual estimate
// ordering, but that's ok because this comparator is only used by GenericIndexed
int retVal = Longs.compare(s1Mem.getCapacity(), s2Mem.getCapacity());
if (retVal == 0) {
retVal = Longs.compare(s1Mem.getLong(s1Mem.getCapacity() - 8), s2Mem.getLong(s2Mem.getCapacity() - 8));
}
return retVal;
} else {
return 1;
}
}
throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1);
return SketchHolder.COMPARATOR.compare(s1, s2);
}
@Override
public Class<? extends Sketch> getClazz()
public Class<?> getClazz()
{
return Sketch.class;
return Object.class;
}
@Override
public Object fromByteBuffer(ByteBuffer buffer, int numBytes)
{
if (numBytes == 0) {
return EMPTY_SKETCH;
return SketchHolder.EMPTY;
}
return new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes);
return SketchHolder.of(new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes));
}
@Override
public byte[] toBytes(Object obj)
{
if (obj instanceof Sketch) {
Sketch sketch = (Sketch) obj;
if (obj instanceof SketchHolder) {
Sketch sketch = ((SketchHolder) obj).getSketch();
if (sketch.isEmpty()) {
return EMPTY_BYTES;
}
return sketch.toByteArray();
} else if (obj instanceof Memory) {
Memory mem = (Memory) obj;
byte[] retVal = new byte[(int) mem.getCapacity()];
mem.getByteArray(0, retVal, 0, (int) mem.getCapacity());
return retVal;
} else if (obj instanceof Union) {
return toBytes(((Union) obj).getResult(true, null));
} else if (obj == null) {
return EMPTY_BYTES;
} else {

View File

@ -1,132 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import com.google.common.base.Charsets;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.AnotB;
import com.yahoo.sketches.theta.Intersection;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.logger.Logger;
import org.apache.commons.codec.binary.Base64;
public class SketchOperations
{
private static final Logger LOG = new Logger(SketchOperations.class);
public static final Sketch EMPTY_SKETCH = Sketches.updateSketchBuilder().build().compact(true, null);
public static enum Func
{
UNION,
INTERSECT,
NOT;
}
public static Sketch deserialize(Object serializedSketch)
{
if (serializedSketch instanceof String) {
return deserializeFromBase64EncodedString((String) serializedSketch);
} else if (serializedSketch instanceof byte[]) {
return deserializeFromByteArray((byte[]) serializedSketch);
} else if (serializedSketch instanceof Sketch) {
return (Sketch) serializedSketch;
} else if (serializedSketch instanceof Union) {
return ((Union) serializedSketch).getResult(true, null);
}
throw new IllegalStateException(
"Object is not of a type that can deserialize to sketch: "
+ serializedSketch.getClass()
);
}
public static Sketch deserializeFromBase64EncodedString(String str)
{
return deserializeFromByteArray(
Base64.decodeBase64(
str.getBytes(Charsets.UTF_8)
)
);
}
public static Sketch deserializeFromByteArray(byte[] data)
{
return deserializeFromMemory(new NativeMemory(data));
}
public static Sketch deserializeFromMemory(Memory mem)
{
if (Sketch.getSerializationVersion(mem) < 3) {
return Sketches.heapifySketch(mem);
} else {
return Sketches.wrapSketch(mem);
}
}
public static Sketch sketchSetOperation(Func func, int sketchSize, Sketch... sketches)
{
//in the code below, I am returning SetOp.getResult(false, null)
//"false" gets us an unordered sketch which is faster to build
//"true" returns an ordered sketch but slower to compute. advantage of ordered sketch
//is that they are faster to "union" later but given that this method is used in
//the final stages of query processing, ordered sketch would be of no use.
switch (func) {
case UNION:
Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION);
for (Sketch sketch : sketches) {
union.update(sketch);
}
return union.getResult(false, null);
case INTERSECT:
Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION);
for (Sketch sketch : sketches) {
intersection.update(sketch);
}
return intersection.getResult(false, null);
case NOT:
if (sketches.length < 1) {
throw new IllegalArgumentException("A-Not-B requires atleast 1 sketch");
}
if (sketches.length == 1) {
return sketches[0];
}
Sketch result = sketches[0];
for (int i = 1; i < sketches.length; i++) {
AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B);
anotb.update(result, sketches[i]);
result = anotb.getResult(false, null);
}
return result;
default:
throw new IllegalArgumentException("Unknown sketch operation " + func);
}
}
}

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.theta.Sketch;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.PostAggregator;
@ -40,7 +39,7 @@ public class SketchSetPostAggregator implements PostAggregator
private final String name;
private final List<PostAggregator> fields;
private final SketchOperations.Func func;
private final SketchHolder.Func func;
private final int maxSketchSize;
@JsonCreator
@ -53,7 +52,7 @@ public class SketchSetPostAggregator implements PostAggregator
{
this.name = name;
this.fields = fields;
this.func = SketchOperations.Func.valueOf(func);
this.func = SketchHolder.Func.valueOf(func);
this.maxSketchSize = maxSize == null ? SketchAggregatorFactory.DEFAULT_MAX_SKETCH_SIZE : maxSize;
Util.checkIfPowerOf2(this.maxSketchSize, "size");
@ -75,18 +74,18 @@ public class SketchSetPostAggregator implements PostAggregator
@Override
public Comparator<Object> getComparator()
{
return SketchAggregatorFactory.COMPARATOR;
return SketchHolder.COMPARATOR;
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
Sketch[] sketches = new Sketch[fields.size()];
Object[] sketches = new Object[fields.size()];
for (int i = 0; i < sketches.length; i++) {
sketches[i] = SketchAggregatorFactory.toSketch(fields.get(i).compute(combinedAggregators));
sketches[i] = fields.get(i).compute(combinedAggregators);
}
return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches);
return SketchHolder.sketchSetOperation(func, maxSketchSize, sketches);
}
@Override

View File

@ -1,40 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.yahoo.sketches.theta.Union;
import java.io.IOException;
/**
*/
public class UnionJsonSerializer extends JsonSerializer<Union>
{
@Override
public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeBinary(union.getResult(true, null).toByteArray());
}
}

View File

@ -23,10 +23,10 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.yahoo.sketches.theta.Sketch;
import io.druid.initialization.DruidModule;
import io.druid.query.aggregation.datasketches.theta.SketchBuildComplexMetricSerde;
import io.druid.query.aggregation.datasketches.theta.SketchJsonSerializer;
import io.druid.query.aggregation.datasketches.theta.SketchHolder;
import io.druid.query.aggregation.datasketches.theta.SketchHolderJsonSerializer;
import io.druid.query.aggregation.datasketches.theta.SketchMergeComplexMetricSerde;
import io.druid.query.aggregation.datasketches.theta.SketchModule;
import io.druid.segment.serde.ComplexMetrics;
@ -80,7 +80,7 @@ public class OldApiSketchModule implements DruidModule
new NamedType(OldSketchEstimatePostAggregator.class, "sketchEstimate"),
new NamedType(OldSketchSetPostAggregator.class, "sketchSetOper")
)
.addSerializer(Sketch.class, new SketchJsonSerializer())
.addSerializer(SketchHolder.class, new SketchHolderJsonSerializer())
);
}
}

View File

@ -206,7 +206,7 @@ public class SketchAggregationTest
@Test
public void testSketchMergeFinalization() throws Exception
{
Sketch sketch = Sketches.updateSketchBuilder().build(128);
SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().build(128));
SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);
@ -344,7 +344,7 @@ public class SketchAggregationTest
@Test
public void testSketchAggregatorFactoryComparator()
{
Comparator<Object> comparator = SketchAggregatorFactory.COMPARATOR;
Comparator<Object> comparator = SketchHolder.COMPARATOR;
Assert.assertEquals(0, comparator.compare(null, null));
Union union1 = (Union) SetOperation.builder().build(1<<4, Family.UNION);
@ -352,8 +352,8 @@ public class SketchAggregationTest
union1.update("b");
Sketch sketch1 = union1.getResult();
Assert.assertEquals(-1, comparator.compare(null, sketch1));
Assert.assertEquals(1, comparator.compare(sketch1, null));
Assert.assertEquals(-1, comparator.compare(null, SketchHolder.of(sketch1)));
Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch1), null));
Union union2 = (Union) SetOperation.builder().build(1<<4, Family.UNION);
union2.update("a");
@ -361,12 +361,12 @@ public class SketchAggregationTest
union2.update("c");
Sketch sketch2 = union2.getResult();
Assert.assertEquals(-1, comparator.compare(sketch1, sketch2));
Assert.assertEquals(-1, comparator.compare(sketch1, union2));
Assert.assertEquals(1, comparator.compare(sketch2, sketch1));
Assert.assertEquals(1, comparator.compare(sketch2, union1));
Assert.assertEquals(1, comparator.compare(union2, union1));
Assert.assertEquals(1, comparator.compare(union2, sketch1));
Assert.assertEquals(-1, comparator.compare(SketchHolder.of(sketch1), SketchHolder.of(sketch2)));
Assert.assertEquals(-1, comparator.compare(SketchHolder.of(sketch1), SketchHolder.of(union2)));
Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch2), SketchHolder.of(sketch1)));
Assert.assertEquals(1, comparator.compare(SketchHolder.of(sketch2), SketchHolder.of(union1)));
Assert.assertEquals(1, comparator.compare(SketchHolder.of(union2), SketchHolder.of(union1)));
Assert.assertEquals(1, comparator.compare(SketchHolder.of(union2), SketchHolder.of(sketch1)));
}
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception