mirror of
https://github.com/apache/druid.git
synced 2025-03-03 15:59:16 +00:00
Merge pull request #1991 from himanshug/sketch_agg_upgrade
datasketches module updates
This commit is contained in:
commit
6305dfe1b9
@ -5,6 +5,7 @@ layout: doc_page
|
||||
## DataSketches aggregator
|
||||
Druid aggregators based on [datasketches]()http://datasketches.github.io/) library. Note that sketch algorithms are approxiate, see details in the "Accuracy" section of the datasketches doc.
|
||||
At ingestion time, this aggregator creates the theta sketch objects which get stored in Druid segments. Logically speaking, a theta sketch object can be thought of as a Set data structure. At query time, sketches are read and aggregated(set unioned) together. In the end, by default, you receive the estimate of number of unique entries in the sketch object. Also, You can use post aggregators to do union, intersection or difference on sketch columns in the same row.
|
||||
Note that you can use `thetaSketch` aggregator on columns which were not ingested using same, it will return estimated cardinality of the column. It is recommended to use it at ingestion time as well to make querying faster.
|
||||
|
||||
### Aggregators
|
||||
|
||||
|
@ -18,102 +18,103 @@
|
||||
~ under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<groupId>io.druid.extensions</groupId>
|
||||
<artifactId>druid-datasketches</artifactId>
|
||||
<name>druid-datasketches</name>
|
||||
<description>Druid Aggregators based on datasketches lib http://datasketches.github.io/</description>
|
||||
<groupId>io.druid.extensions</groupId>
|
||||
<artifactId>druid-datasketches</artifactId>
|
||||
<name>druid-datasketches</name>
|
||||
<description>Druid Aggregators based on datasketches lib http://datasketches.github.io/</description>
|
||||
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
<parent>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.9.0-SNAPSHOT</version>
|
||||
<relativePath>../../pom.xml</relativePath>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.datasketches</groupId>
|
||||
<artifactId>sketches-core</artifactId>
|
||||
<version>0.1.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-api</artifactId>
|
||||
<version>${druid.api.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.yahoo.datasketches</groupId>
|
||||
<artifactId>sketches-core</artifactId>
|
||||
<version>0.2.2</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-api</artifactId>
|
||||
<version>${druid.api.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-guava</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-joda</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-smile</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||
<artifactId>jackson-jaxrs-json-provider</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||
<artifactId>jackson-jaxrs-smile-provider</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-guava</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.datatype</groupId>
|
||||
<artifactId>jackson-datatype-joda</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.dataformat</groupId>
|
||||
<artifactId>jackson-dataformat-smile</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||
<artifactId>jackson-jaxrs-json-provider</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.jaxrs</groupId>
|
||||
<artifactId>jackson-jaxrs-smile-provider</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Test Dependencies -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<!-- Test Dependencies -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.druid</groupId>
|
||||
<artifactId>druid-processing</artifactId>
|
||||
<version>${project.parent.version}</version>
|
||||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
@ -19,17 +19,22 @@
|
||||
|
||||
package io.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.metamx.common.ISE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.yahoo.sketches.Family;
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.theta.SetOpReturnState;
|
||||
import com.yahoo.sketches.theta.SetOperation;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import com.yahoo.sketches.theta.Union;
|
||||
import io.druid.query.aggregation.Aggregator;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class SketchAggregator implements Aggregator
|
||||
{
|
||||
private static final Logger logger = new Logger(SketchAggregator.class);
|
||||
|
||||
private final ObjectColumnSelector selector;
|
||||
private final String name;
|
||||
private final int size;
|
||||
@ -48,21 +53,11 @@ public class SketchAggregator implements Aggregator
|
||||
public void aggregate()
|
||||
{
|
||||
Object update = selector.get();
|
||||
|
||||
if(update == null) {
|
||||
if (update == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
SetOpReturnState success;
|
||||
if (update instanceof Memory) {
|
||||
success = union.update((Memory) update);
|
||||
} else {
|
||||
success = union.update((Sketch) update);
|
||||
}
|
||||
|
||||
if(success != SetOpReturnState.Success) {
|
||||
throw new IllegalStateException("Sketch Aggregation failed with state " + success);
|
||||
}
|
||||
updateUnion(union, update);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -105,4 +100,31 @@ public class SketchAggregator implements Aggregator
|
||||
{
|
||||
union = null;
|
||||
}
|
||||
|
||||
static void updateUnion(Union union, Object update)
|
||||
{
|
||||
if (update instanceof Memory) {
|
||||
union.update((Memory) update);
|
||||
} else if (update instanceof Sketch) {
|
||||
union.update((Sketch) update);
|
||||
} else if (update instanceof String) {
|
||||
union.update((String) update);
|
||||
} else if (update instanceof byte[]) {
|
||||
union.update((byte[]) update);
|
||||
} else if (update instanceof Double) {
|
||||
union.update(((Double) update));
|
||||
} else if (update instanceof Integer || update instanceof Long) {
|
||||
union.update(((Number) update).longValue());
|
||||
} else if (update instanceof int[]) {
|
||||
union.update((int[]) update);
|
||||
} else if (update instanceof long[]) {
|
||||
union.update((long[]) update);
|
||||
} else if (update instanceof List) {
|
||||
for (Object entry : (List) update) {
|
||||
union.update(entry.toString());
|
||||
}
|
||||
} else {
|
||||
throw new ISE("Illegal type received while theta sketch merging [%s]", update.getClass());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,13 +19,12 @@
|
||||
|
||||
package io.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.yahoo.sketches.Family;
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.memory.MemoryRegion;
|
||||
import com.yahoo.sketches.memory.NativeMemory;
|
||||
import com.yahoo.sketches.theta.SetOpReturnState;
|
||||
import com.yahoo.sketches.theta.SetOperation;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import com.yahoo.sketches.theta.Union;
|
||||
import io.druid.query.aggregation.BufferAggregator;
|
||||
import io.druid.segment.ObjectColumnSelector;
|
||||
@ -36,6 +35,8 @@ import java.util.Map;
|
||||
|
||||
public class SketchBufferAggregator implements BufferAggregator
|
||||
{
|
||||
private static final Logger logger = new Logger(SketchAggregator.class);
|
||||
|
||||
private final ObjectColumnSelector selector;
|
||||
private final int size;
|
||||
private final int maxIntermediateSize;
|
||||
@ -59,28 +60,19 @@ public class SketchBufferAggregator implements BufferAggregator
|
||||
}
|
||||
|
||||
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
|
||||
unions.put(position, (Union) SetOperation.builder().setMemory(mem).build(size, Family.UNION));
|
||||
unions.put(position, (Union) SetOperation.builder().initMemory(mem).build(size, Family.UNION));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void aggregate(ByteBuffer buf, int position)
|
||||
{
|
||||
Object update = selector.get();
|
||||
if(update == null) {
|
||||
if (update == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Union union = getUnion(buf, position);
|
||||
SetOpReturnState success;
|
||||
if (update instanceof Memory) {
|
||||
success = union.update((Memory) update);
|
||||
} else {
|
||||
success = union.update((Sketch) update);
|
||||
}
|
||||
|
||||
if(success != SetOpReturnState.Success) {
|
||||
throw new IllegalStateException("Sketch Buffer Aggregation failed with state " + update);
|
||||
}
|
||||
SketchAggregator.updateUnion(union, update);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -98,7 +90,7 @@ public class SketchBufferAggregator implements BufferAggregator
|
||||
private Union getUnion(ByteBuffer buf, int position)
|
||||
{
|
||||
Union union = unions.get(position);
|
||||
if(union == null) {
|
||||
if (union == null) {
|
||||
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
|
||||
union = (Union) SetOperation.wrap(mem);
|
||||
unions.put(position, union);
|
||||
@ -119,7 +111,8 @@ public class SketchBufferAggregator implements BufferAggregator
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
public void close()
|
||||
{
|
||||
unions.clear();
|
||||
}
|
||||
|
||||
|
@ -19,16 +19,9 @@
|
||||
|
||||
package io.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.metamx.common.IAE;
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import com.yahoo.sketches.theta.Sketches;
|
||||
import com.yahoo.sketches.theta.UpdateSketch;
|
||||
import io.druid.data.input.InputRow;
|
||||
import io.druid.segment.serde.ComplexMetricExtractor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SketchBuildComplexMetricSerde extends SketchMergeComplexMetricSerde
|
||||
@ -48,33 +41,7 @@ public class SketchBuildComplexMetricSerde extends SketchMergeComplexMetricSerde
|
||||
@Override
|
||||
public Object extractValue(InputRow inputRow, String metricName)
|
||||
{
|
||||
Object obj = inputRow.getRaw(metricName);
|
||||
if (obj == null || obj instanceof Sketch || obj instanceof Memory) {
|
||||
return obj;
|
||||
}
|
||||
|
||||
UpdateSketch sketch = Sketches.updateSketchBuilder().build(4096);
|
||||
if (obj instanceof String) {
|
||||
sketch.update((String) obj);
|
||||
} else if (obj instanceof byte[]) {
|
||||
sketch.update((byte[]) obj);
|
||||
} else if (obj instanceof Double) {
|
||||
sketch.update(((Double) obj));
|
||||
} else if (obj instanceof Integer || obj instanceof Long) {
|
||||
sketch.update(((Number) obj).longValue());
|
||||
} else if (obj instanceof int[]) {
|
||||
sketch.update((int[]) obj);
|
||||
} else if (obj instanceof long[]) {
|
||||
sketch.update((long[]) obj);
|
||||
} else if (obj instanceof List) {
|
||||
for (Object entry : (List) obj) {
|
||||
sketch.update(entry.toString());
|
||||
}
|
||||
} else {
|
||||
throw new IAE("Unknown object type[%s] received for ingestion.", obj.getClass());
|
||||
}
|
||||
|
||||
return sketch;
|
||||
return inputRow.getRaw(metricName);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -30,7 +30,8 @@ import java.io.IOException;
|
||||
public class SketchJsonSerializer extends JsonSerializer<Sketch>
|
||||
{
|
||||
@Override
|
||||
public void serialize(Sketch sketch, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException
|
||||
public void serialize(Sketch sketch, JsonGenerator jgen, SerializerProvider provider)
|
||||
throws IOException, JsonProcessingException
|
||||
{
|
||||
jgen.writeBinary(sketch.toByteArray());
|
||||
}
|
||||
|
@ -86,6 +86,7 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
|
||||
* sketch.
|
||||
*
|
||||
* @param object the sketch object
|
||||
*
|
||||
* @return sketch object
|
||||
*/
|
||||
@Override
|
||||
|
@ -25,7 +25,6 @@ import com.yahoo.sketches.Family;
|
||||
import com.yahoo.sketches.memory.NativeMemory;
|
||||
import com.yahoo.sketches.theta.AnotB;
|
||||
import com.yahoo.sketches.theta.Intersection;
|
||||
import com.yahoo.sketches.theta.SetOpReturnState;
|
||||
import com.yahoo.sketches.theta.SetOperation;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import com.yahoo.sketches.theta.Sketches;
|
||||
@ -74,13 +73,13 @@ public class SketchOperations
|
||||
public static Sketch deserializeFromByteArray(byte[] data)
|
||||
{
|
||||
NativeMemory mem = new NativeMemory(data);
|
||||
if(Sketch.getSerializationVersion(mem) < 3) {
|
||||
if (Sketch.getSerializationVersion(mem) < 3) {
|
||||
return Sketches.heapifySketch(mem);
|
||||
} else {
|
||||
return Sketches.wrapSketch(mem);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static Sketch sketchSetOperation(Func func, int sketchSize, Sketch... sketches)
|
||||
{
|
||||
//in the code below, I am returning SetOp.getResult(false, null)
|
||||
@ -91,34 +90,29 @@ public class SketchOperations
|
||||
switch (func) {
|
||||
case UNION:
|
||||
Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION);
|
||||
for(Sketch sketch : sketches) {
|
||||
SetOpReturnState success = union.update(sketch);
|
||||
if(success != SetOpReturnState.Success) {
|
||||
throw new IllegalStateException("Sketch operation failed " + func);
|
||||
}
|
||||
for (Sketch sketch : sketches) {
|
||||
union.update(sketch);
|
||||
}
|
||||
return union.getResult(false, null);
|
||||
case INTERSECT:
|
||||
Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION);
|
||||
for(Sketch sketch : sketches) {
|
||||
SetOpReturnState success = intersection.update(sketch);
|
||||
if(success != SetOpReturnState.Success) {
|
||||
throw new IllegalStateException("Sketch operation failed " + func);
|
||||
}
|
||||
for (Sketch sketch : sketches) {
|
||||
intersection.update(sketch);
|
||||
}
|
||||
return intersection.getResult(false, null);
|
||||
case NOT:
|
||||
if(sketches.length < 2) {
|
||||
throw new IllegalArgumentException("A-Not-B requires atleast 2 sketches");
|
||||
if (sketches.length < 1) {
|
||||
throw new IllegalArgumentException("A-Not-B requires atleast 1 sketch");
|
||||
}
|
||||
|
||||
if (sketches.length == 1) {
|
||||
return sketches[0];
|
||||
}
|
||||
|
||||
Sketch result = sketches[0];
|
||||
for (int i = 1; i < sketches.length; i++) {
|
||||
AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B);
|
||||
SetOpReturnState success = anotb.update(result, sketches[i]);
|
||||
if(success != SetOpReturnState.Success) {
|
||||
throw new IllegalStateException("Sketch operation failed " + func);
|
||||
}
|
||||
anotb.update(result, sketches[i]);
|
||||
result = anotb.getResult(false, null);
|
||||
}
|
||||
return result;
|
||||
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.logger.Logger;
|
||||
|
||||
import com.yahoo.sketches.Util;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
@ -118,7 +117,17 @@ public class SketchSetPostAggregator implements PostAggregator
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return "SketchSetPostAggregator{" + "name='" + name + '\'' + ", fields=" + fields + ", func=" + func + ", size=" + maxSketchSize +"}";
|
||||
return "SketchSetPostAggregator{"
|
||||
+ "name='"
|
||||
+ name
|
||||
+ '\''
|
||||
+ ", fields="
|
||||
+ fields
|
||||
+ ", func="
|
||||
+ func
|
||||
+ ", size="
|
||||
+ maxSketchSize
|
||||
+ "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,7 +21,6 @@ package io.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.theta.CompactSketch;
|
||||
import com.yahoo.sketches.theta.SetOpReturnState;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import com.yahoo.sketches.theta.Union;
|
||||
|
||||
@ -37,15 +36,51 @@ public class SynchronizedUnion implements Union
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized SetOpReturnState update(Sketch sketch)
|
||||
public synchronized void update(Sketch sketchIn)
|
||||
{
|
||||
return delegate.update(sketch);
|
||||
delegate.update(sketchIn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized SetOpReturnState update(Memory memory)
|
||||
public synchronized void update(Memory mem)
|
||||
{
|
||||
return delegate.update(memory);
|
||||
delegate.update(mem);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void update(long datum)
|
||||
{
|
||||
delegate.update(datum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void update(double datum)
|
||||
{
|
||||
delegate.update(datum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void update(String datum)
|
||||
{
|
||||
delegate.update(datum);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void update(byte[] data)
|
||||
{
|
||||
delegate.update(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void update(int[] data)
|
||||
{
|
||||
delegate.update(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void update(long[] data)
|
||||
{
|
||||
delegate.update(data);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -54,6 +89,12 @@ public class SynchronizedUnion implements Union
|
||||
return delegate.getResult(b, memory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompactSketch getResult()
|
||||
{
|
||||
return delegate.getResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized byte[] toByteArray()
|
||||
{
|
||||
|
@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.inject.Binder;
|
||||
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.query.aggregation.datasketches.theta.SketchBuildComplexMetricSerde;
|
||||
|
@ -123,6 +123,43 @@ public class SketchAggregationTest
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThetaCardinalityOnSimpleColumn() throws Exception
|
||||
{
|
||||
Sequence seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(SketchAggregationTest.class.getClassLoader().getResource("simple_test_data.tsv").getFile()),
|
||||
readFileFromClasspathAsString("simple_test_data_record_parser2.json"),
|
||||
"["
|
||||
+ " {"
|
||||
+ " \"type\": \"count\","
|
||||
+ " \"name\": \"count\""
|
||||
+ " }"
|
||||
+ "]",
|
||||
0,
|
||||
QueryGranularity.NONE,
|
||||
5,
|
||||
readFileFromClasspathAsString("simple_test_data_group_by_query.json")
|
||||
);
|
||||
|
||||
List results = Sequences.toList(seq, Lists.newArrayList());
|
||||
Assert.assertEquals(1, results.size());
|
||||
Assert.assertEquals(
|
||||
new MapBasedRow(
|
||||
DateTime.parse("2014-10-19T00:00:00.000Z"),
|
||||
ImmutableMap
|
||||
.<String, Object>builder()
|
||||
.put("sketch_count", 50.0)
|
||||
.put("sketchEstimatePostAgg", 50.0)
|
||||
.put("sketchUnionPostAggEstimate", 50.0)
|
||||
.put("sketchIntersectionPostAggEstimate", 50.0)
|
||||
.put("sketchAnotBPostAggEstimate", 0.0)
|
||||
.put("non_existing_col_validation", 0.0)
|
||||
.build()
|
||||
),
|
||||
results.get(0)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSketchMergeAggregatorFactorySerde() throws Exception
|
||||
{
|
||||
@ -146,7 +183,8 @@ public class SketchAggregationTest
|
||||
Assert.assertEquals(sketch, agg.finalizeComputation(sketch));
|
||||
}
|
||||
|
||||
private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception{
|
||||
private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception
|
||||
{
|
||||
Assert.assertEquals(
|
||||
agg,
|
||||
helper.getObjectMapper().readValue(
|
||||
@ -183,7 +221,8 @@ public class SketchAggregationTest
|
||||
);
|
||||
}
|
||||
|
||||
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception{
|
||||
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
|
||||
{
|
||||
Assert.assertEquals(
|
||||
agg,
|
||||
helper.getObjectMapper().readValue(
|
||||
|
@ -139,7 +139,8 @@ public class OldApiSketchAggregationTest
|
||||
assertAggregatorFactorySerde(new OldSketchBuildAggregatorFactory("name", "fieldName", 16));
|
||||
}
|
||||
|
||||
private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception{
|
||||
private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception
|
||||
{
|
||||
Assert.assertEquals(
|
||||
agg,
|
||||
helper.getObjectMapper().readValue(
|
||||
@ -176,7 +177,8 @@ public class OldApiSketchAggregationTest
|
||||
);
|
||||
}
|
||||
|
||||
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception{
|
||||
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
|
||||
{
|
||||
Assert.assertEquals(
|
||||
agg,
|
||||
helper.getObjectMapper().readValue(
|
||||
|
@ -4,8 +4,18 @@
|
||||
"granularity": "ALL",
|
||||
"dimensions": [],
|
||||
"aggregations": [
|
||||
{ "type": "sketchMerge", "name": "sketch_count", "fieldName": "pty_country", "size": 16384 },
|
||||
{ "type": "sketchMerge", "name": "non_existing_col_validation", "fieldName": "non_existing_col", "size": 16384 }
|
||||
{
|
||||
"type": "sketchMerge",
|
||||
"name": "sketch_count",
|
||||
"fieldName": "pty_country",
|
||||
"size": 16384
|
||||
},
|
||||
{
|
||||
"type": "sketchMerge",
|
||||
"name": "non_existing_col_validation",
|
||||
"fieldName": "non_existing_col",
|
||||
"size": 16384
|
||||
}
|
||||
],
|
||||
"postAggregations": [
|
||||
{
|
||||
@ -19,8 +29,7 @@
|
||||
{
|
||||
"type": "sketchEstimate",
|
||||
"name": "sketchIntersectionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "sketchSetOper",
|
||||
"name": "sketchIntersectionPostAgg",
|
||||
"func": "INTERSECT",
|
||||
@ -40,8 +49,7 @@
|
||||
{
|
||||
"type": "sketchEstimate",
|
||||
"name": "sketchAnotBPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "sketchSetOper",
|
||||
"name": "sketchAnotBUnionPostAgg",
|
||||
"func": "NOT",
|
||||
@ -61,8 +69,7 @@
|
||||
{
|
||||
"type": "sketchEstimate",
|
||||
"name": "sketchUnionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "sketchSetOper",
|
||||
"name": "sketchUnionPostAgg",
|
||||
"func": "UNION",
|
||||
@ -80,5 +87,7 @@
|
||||
}
|
||||
}
|
||||
],
|
||||
"intervals": [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ]
|
||||
"intervals": [
|
||||
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
|
||||
]
|
||||
}
|
||||
|
@ -4,8 +4,18 @@
|
||||
"granularity": "ALL",
|
||||
"dimensions": [],
|
||||
"aggregations": [
|
||||
{ "type": "sketchMerge", "name": "sids_sketch_count", "fieldName": "sids_sketch", "size": 16384 },
|
||||
{ "type": "sketchMerge", "name": "non_existing_col_validation", "fieldName": "non_existing_col", "size": 16384 }
|
||||
{
|
||||
"type": "sketchMerge",
|
||||
"name": "sids_sketch_count",
|
||||
"fieldName": "sids_sketch",
|
||||
"size": 16384
|
||||
},
|
||||
{
|
||||
"type": "sketchMerge",
|
||||
"name": "non_existing_col_validation",
|
||||
"fieldName": "non_existing_col",
|
||||
"size": 16384
|
||||
}
|
||||
],
|
||||
"postAggregations": [
|
||||
{
|
||||
@ -19,8 +29,7 @@
|
||||
{
|
||||
"type": "sketchEstimate",
|
||||
"name": "sketchIntersectionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "sketchSetOper",
|
||||
"name": "sketchIntersectionPostAgg",
|
||||
"func": "INTERSECT",
|
||||
@ -40,8 +49,7 @@
|
||||
{
|
||||
"type": "sketchEstimate",
|
||||
"name": "sketchAnotBPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "sketchSetOper",
|
||||
"name": "sketchAnotBUnionPostAgg",
|
||||
"func": "NOT",
|
||||
@ -60,8 +68,7 @@
|
||||
{
|
||||
"type": "sketchEstimate",
|
||||
"name": "sketchUnionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "sketchSetOper",
|
||||
"name": "sketchUnionPostAgg",
|
||||
"func": "UNION",
|
||||
@ -79,5 +86,7 @@
|
||||
}
|
||||
}
|
||||
],
|
||||
"intervals": [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ]
|
||||
"intervals": [
|
||||
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
|
||||
]
|
||||
}
|
||||
|
@ -4,8 +4,18 @@
|
||||
"granularity": "ALL",
|
||||
"dimensions": [],
|
||||
"aggregations": [
|
||||
{ "type": "thetaSketch", "name": "sketch_count", "fieldName": "pty_country", "size": 16384 },
|
||||
{ "type": "thetaSketch", "name": "non_existing_col_validation", "fieldName": "non_existing_col", "size": 16384 }
|
||||
{
|
||||
"type": "thetaSketch",
|
||||
"name": "sketch_count",
|
||||
"fieldName": "pty_country",
|
||||
"size": 16384
|
||||
},
|
||||
{
|
||||
"type": "thetaSketch",
|
||||
"name": "non_existing_col_validation",
|
||||
"fieldName": "non_existing_col",
|
||||
"size": 16384
|
||||
}
|
||||
],
|
||||
"postAggregations": [
|
||||
{
|
||||
@ -19,8 +29,7 @@
|
||||
{
|
||||
"type": "thetaSketchEstimate",
|
||||
"name": "sketchIntersectionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "thetaSketchSetOp",
|
||||
"name": "sketchIntersectionPostAgg",
|
||||
"func": "INTERSECT",
|
||||
@ -40,8 +49,7 @@
|
||||
{
|
||||
"type": "thetaSketchEstimate",
|
||||
"name": "sketchAnotBPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "thetaSketchSetOp",
|
||||
"name": "sketchAnotBUnionPostAgg",
|
||||
"func": "NOT",
|
||||
@ -61,8 +69,7 @@
|
||||
{
|
||||
"type": "thetaSketchEstimate",
|
||||
"name": "sketchUnionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "thetaSketchSetOp",
|
||||
"name": "sketchUnionPostAgg",
|
||||
"func": "UNION",
|
||||
@ -80,5 +87,7 @@
|
||||
}
|
||||
}
|
||||
],
|
||||
"intervals": [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ]
|
||||
"intervals": [
|
||||
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
|
||||
]
|
||||
}
|
||||
|
@ -1,16 +1,22 @@
|
||||
{
|
||||
"type" : "string",
|
||||
"parseSpec" : {
|
||||
"format" : "tsv",
|
||||
"timestampSpec" : {
|
||||
"column" : "timestamp",
|
||||
"format" : "yyyyMMddHH"
|
||||
},
|
||||
"dimensionsSpec" : {
|
||||
"dimensions": ["product"],
|
||||
"dimensionExclusions" : [],
|
||||
"spatialDimensions" : []
|
||||
},
|
||||
"columns": ["timestamp", "product", "pty_country"]
|
||||
}
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "tsv",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "yyyyMMddHH"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"product"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
},
|
||||
"columns": [
|
||||
"timestamp",
|
||||
"product",
|
||||
"pty_country"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,23 @@
|
||||
{
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "tsv",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "yyyyMMddHH"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"product",
|
||||
"pty_country"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
},
|
||||
"columns": [
|
||||
"timestamp",
|
||||
"product",
|
||||
"pty_country"
|
||||
]
|
||||
}
|
||||
}
|
@ -4,8 +4,18 @@
|
||||
"granularity": "ALL",
|
||||
"dimensions": [],
|
||||
"aggregations": [
|
||||
{ "type": "thetaSketch", "name": "sids_sketch_count", "fieldName": "sids_sketch", "size": 16384 },
|
||||
{ "type": "thetaSketch", "name": "non_existing_col_validation", "fieldName": "non_existing_col", "size": 16384 }
|
||||
{
|
||||
"type": "thetaSketch",
|
||||
"name": "sids_sketch_count",
|
||||
"fieldName": "sids_sketch",
|
||||
"size": 16384
|
||||
},
|
||||
{
|
||||
"type": "thetaSketch",
|
||||
"name": "non_existing_col_validation",
|
||||
"fieldName": "non_existing_col",
|
||||
"size": 16384
|
||||
}
|
||||
],
|
||||
"postAggregations": [
|
||||
{
|
||||
@ -19,8 +29,7 @@
|
||||
{
|
||||
"type": "thetaSketchEstimate",
|
||||
"name": "sketchIntersectionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "thetaSketchSetOp",
|
||||
"name": "sketchIntersectionPostAgg",
|
||||
"func": "INTERSECT",
|
||||
@ -40,8 +49,7 @@
|
||||
{
|
||||
"type": "thetaSketchEstimate",
|
||||
"name": "sketchAnotBPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "thetaSketchSetOp",
|
||||
"name": "sketchAnotBUnionPostAgg",
|
||||
"func": "NOT",
|
||||
@ -60,8 +68,7 @@
|
||||
{
|
||||
"type": "thetaSketchEstimate",
|
||||
"name": "sketchUnionPostAggEstimate",
|
||||
"field":
|
||||
{
|
||||
"field": {
|
||||
"type": "thetaSketchSetOp",
|
||||
"name": "sketchUnionPostAgg",
|
||||
"func": "UNION",
|
||||
@ -79,5 +86,7 @@
|
||||
}
|
||||
}
|
||||
],
|
||||
"intervals": [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ]
|
||||
"intervals": [
|
||||
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
|
||||
]
|
||||
}
|
||||
|
@ -1,16 +1,22 @@
|
||||
{
|
||||
"type" : "string",
|
||||
"parseSpec" : {
|
||||
"format" : "tsv",
|
||||
"timestampSpec" : {
|
||||
"column" : "timestamp",
|
||||
"format" : "yyyyMMddHH"
|
||||
},
|
||||
"dimensionsSpec" : {
|
||||
"dimensions": ["product"],
|
||||
"dimensionExclusions" : [],
|
||||
"spatialDimensions" : []
|
||||
},
|
||||
"columns": ["timestamp", "product", "sketch"]
|
||||
}
|
||||
"type": "string",
|
||||
"parseSpec": {
|
||||
"format": "tsv",
|
||||
"timestampSpec": {
|
||||
"column": "timestamp",
|
||||
"format": "yyyyMMddHH"
|
||||
},
|
||||
"dimensionsSpec": {
|
||||
"dimensions": [
|
||||
"product"
|
||||
],
|
||||
"dimensionExclusions": [],
|
||||
"spatialDimensions": []
|
||||
},
|
||||
"columns": [
|
||||
"timestamp",
|
||||
"product",
|
||||
"sketch"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user