SketchAggregatorFactory.combine(..) returns Union object now so that it can be reused across multiple combine(..) calls (#3471)

This commit is contained in:
Himanshu 2016-10-05 10:40:14 -05:00 committed by Gian Merlino
parent 592903571a
commit 1523de08fb
7 changed files with 82 additions and 7 deletions

View File

@ -107,6 +107,8 @@ public class SketchAggregator implements Aggregator
union.update((Memory) update); union.update((Memory) update);
} else if (update instanceof Sketch) { } else if (update instanceof Sketch) {
union.update((Sketch) update); union.update((Sketch) update);
} else if (update instanceof Union) {
union.update(((Union) update).getResult(false, null));
} else if (update instanceof String) { } else if (update instanceof String) {
union.update((String) update); union.update((String) update);
} else if (update instanceof byte[]) { } else if (update instanceof byte[]) {

View File

@ -110,10 +110,21 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
@Override @Override
public Object combine(Object lhs, Object rhs) public Object combine(Object lhs, Object rhs)
{ {
Union union = (Union) SetOperation.builder().build(size, Family.UNION); final Union union;
updateUnion(union, lhs); if (lhs instanceof Union) {
updateUnion(union, rhs); union = (Union) lhs;
return union.getResult(false, null); updateUnion(union, rhs);
} else if (rhs instanceof Union) {
union = (Union) rhs;
updateUnion(union, lhs);
} else {
union = (Union) SetOperation.builder().build(size, Family.UNION);
updateUnion(union, lhs);
updateUnion(union, rhs);
}
return union;
} }
private void updateUnion(Union union, Object obj) private void updateUnion(Union union, Object obj)
@ -124,6 +135,8 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
union.update((Memory) obj); union.update((Memory) obj);
} else if (obj instanceof Sketch) { } else if (obj instanceof Sketch) {
union.update((Sketch) obj); union.update((Sketch) obj);
} else if (obj instanceof Union) {
union.update(((Union) obj).getResult(false, null));
} else { } else {
throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName()); throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName());
} }

View File

@ -82,7 +82,7 @@ public class SketchEstimatePostAggregator implements PostAggregator
@Override @Override
public Object compute(Map<String, Object> combinedAggregators) public Object compute(Map<String, Object> combinedAggregators)
{ {
Sketch sketch = (Sketch) field.compute(combinedAggregators); Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(combinedAggregators));
if (errorBoundsStdDev != null) { if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds( SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(), sketch.getEstimate(),

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.yahoo.sketches.memory.Memory; import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.initialization.DruidModule; import io.druid.initialization.DruidModule;
import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.serde.ComplexMetrics;
@ -71,7 +72,11 @@ public class SketchModule implements DruidModule
Sketch.class, new SketchJsonSerializer() Sketch.class, new SketchJsonSerializer()
) )
.addSerializer( .addSerializer(
Memory.class, new MemoryJsonSerializer()) Memory.class, new MemoryJsonSerializer()
)
.addSerializer(
Union.class, new UnionJsonSerializer()
)
); );
} }
} }

View File

@ -26,6 +26,7 @@ import com.yahoo.sketches.memory.MemoryRegion;
import com.yahoo.sketches.memory.NativeMemory; import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches; import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.segment.data.ObjectStrategy; import io.druid.segment.data.ObjectStrategy;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -98,6 +99,8 @@ public class SketchObjectStrategy implements ObjectStrategy
byte[] retVal = new byte[(int) mem.getCapacity()]; byte[] retVal = new byte[(int) mem.getCapacity()];
mem.getByteArray(0, retVal, 0, (int) mem.getCapacity()); mem.getByteArray(0, retVal, 0, (int) mem.getCapacity());
return retVal; return retVal;
} else if (obj instanceof Union) {
return toBytes(((Union) obj).getResult(true, null));
} else if (obj == null) { } else if (obj == null) {
return EMPTY_BYTES; return EMPTY_BYTES;
} else { } else {

View File

@ -26,6 +26,7 @@ import com.metamx.common.IAE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.yahoo.sketches.Util; import com.yahoo.sketches.Util;
import com.yahoo.sketches.theta.Sketch; import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.PostAggregator;
import java.util.Comparator; import java.util.Comparator;
@ -83,12 +84,23 @@ public class SketchSetPostAggregator implements PostAggregator
{ {
Sketch[] sketches = new Sketch[fields.size()]; Sketch[] sketches = new Sketch[fields.size()];
for (int i = 0; i < sketches.length; i++) { for (int i = 0; i < sketches.length; i++) {
sketches[i] = (Sketch) fields.get(i).compute(combinedAggregators); sketches[i] = toSketch(fields.get(i).compute(combinedAggregators));
} }
return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches); return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches);
} }
public final static Sketch toSketch(Object obj)
{
if (obj instanceof Sketch) {
return (Sketch) obj;
} else if (obj instanceof Union) {
return ((Union) obj).getResult(true, null);
} else {
throw new IAE("Can't convert to Sketch object [%s]", obj.getClass());
}
}
@Override @Override
@JsonProperty @JsonProperty
public String getName() public String getName()

View File

@ -0,0 +1,40 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.yahoo.sketches.theta.Union;
import java.io.IOException;
/**
*/
public class UnionJsonSerializer extends JsonSerializer<Union>
{
@Override
public void serialize(Union union, JsonGenerator jgen, SerializerProvider provider)
throws IOException, JsonProcessingException
{
jgen.writeBinary(union.getResult(true, null).toByteArray());
}
}