Sketch cache key should include size, isInputThetaSketch. (#2893)

This commit is contained in:
Gian Merlino 2016-04-28 10:10:46 -07:00 committed by Fangjin Yang
parent 90ce03c66f
commit 909abd17f3
3 changed files with 55 additions and 5 deletions

View File

@ -22,6 +22,7 @@ package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Ints;
import com.metamx.common.IAE;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.Util;
@ -169,7 +170,11 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
public byte[] getCacheKey()
{
byte[] fieldNameBytes = fieldName.getBytes();
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(cacheId).put(fieldNameBytes).array();
return ByteBuffer.allocate(1 + Ints.BYTES + fieldNameBytes.length)
.put(cacheId)
.putInt(size)
.put(fieldNameBytes)
.array();
}
@Override

View File

@ -25,6 +25,7 @@ import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.AggregatorFactoryNotMergeableException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@ -104,7 +105,7 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
{
return isInputThetaSketch;
}
@JsonProperty
public Integer getErrorBoundsStdDev()
{
@ -126,9 +127,9 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
Sketch sketch = (Sketch) object;
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
sketch.getEstimate(),
sketch.getUpperBound(errorBoundsStdDev),
sketch.getLowerBound(errorBoundsStdDev),
errorBoundsStdDev);
return result;
} else {
@ -149,6 +150,17 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
}
}
@Override
public byte[] getCacheKey()
{
final byte[] superCacheKey = super.getCacheKey();
return ByteBuffer.allocate(superCacheKey.length + 1)
.put(superCacheKey)
.put(isInputThetaSketch ? (byte) 1 : (byte) 0)
.array();
}
@Override
public boolean equals(Object o)
{

View File

@ -46,6 +46,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;
/**
@ -384,6 +385,38 @@ public class SketchAggregationTest
);
}
@Test
public void testCacheKey()
{
final SketchMergeAggregatorFactory factory1 = new SketchMergeAggregatorFactory(
"name",
"fieldName",
16,
null,
null,
null
);
final SketchMergeAggregatorFactory factory2 = new SketchMergeAggregatorFactory(
"name",
"fieldName",
16,
null,
null,
null
);
final SketchMergeAggregatorFactory factory3 = new SketchMergeAggregatorFactory(
"name",
"fieldName",
32,
null,
null,
null
);
Assert.assertTrue(Arrays.equals(factory1.getCacheKey(), factory2.getCacheKey()));
Assert.assertFalse(Arrays.equals(factory1.getCacheKey(), factory3.getCacheKey()));
}
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
{
Assert.assertEquals(