druid aggregators based on datasketches lib http://datasketches.github.io/

This commit is contained in:
Himanshu Gupta 2015-10-30 13:10:08 -05:00
parent 07157f9e9d
commit 817cf41f5c
28 changed files with 8920 additions and 0 deletions

View File

@ -0,0 +1,9 @@
This module provides druid aggregators based on http://datasketches.github.io/ .
Credits: This module is a result of feedback and work done by following people.
https://github.com/cheddar
https://github.com/himanshug
https://github.com/leerho
https://github.com/will-lauer

View File

@ -0,0 +1,119 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to Metamarkets Group Inc. (Metamarkets) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. Metamarkets licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-datasketches</artifactId>
<name>druid-datasketches</name>
<description>Druid Aggregators based on datasketches lib http://datasketches.github.io/</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.9.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>com.yahoo.datasketches</groupId>
<artifactId>sketches-core</artifactId>
<version>0.1.1</version>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${druid.api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-guava</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-smile-provider</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<!-- Test Dependencies -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,71 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import io.druid.query.aggregation.Aggregator;
public class EmptySketchAggregator implements Aggregator
{
private final String name;
public EmptySketchAggregator(String name)
{
this.name = name;
}
@Override
public void aggregate()
{
}
@Override
public void reset()
{
}
@Override
public Object get()
{
return SketchOperations.EMPTY_SKETCH;
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import io.druid.query.aggregation.BufferAggregator;
import java.nio.ByteBuffer;
public class EmptySketchBufferAggregator implements BufferAggregator
{
public EmptySketchBufferAggregator()
{
}
@Override
public void init(ByteBuffer buf, int position)
{
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
}
@Override
public Object get(ByteBuffer buf, int position)
{
return SketchOperations.EMPTY_SKETCH;
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close()
{
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOpReturnState;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.Aggregator;
import io.druid.segment.ObjectColumnSelector;
public class SketchAggregator implements Aggregator
{
private final ObjectColumnSelector selector;
private final String name;
private final int size;
private Union union;
public SketchAggregator(String name, ObjectColumnSelector selector, int size)
{
this.name = name;
this.selector = selector;
this.size = size;
union = new SynchronizedUnion((Union) SetOperation.builder().build(size, Family.UNION));
}
@Override
public void aggregate()
{
Object update = selector.get();
if(update == null) {
return;
}
SetOpReturnState success;
if (update instanceof Memory) {
success = union.update((Memory) update);
} else {
success = union.update((Sketch) update);
}
if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch Aggregation failed with state " + success);
}
}
@Override
public void reset()
{
union.reset();
}
@Override
public Object get()
{
//in the code below, I am returning SetOp.getResult(true, null)
//"true" returns an ordered sketch but slower to compute than unordered sketch.
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return union.getResult(true, null);
}
@Override
public float getFloat()
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getLong()
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public String getName()
{
return name;
}
@Override
public void close()
{
union = null;
}
}

View File

@ -0,0 +1,220 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Doubles;
import com.metamx.common.IAE;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ColumnSelectorFactory;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
public abstract class SketchAggregatorFactory implements AggregatorFactory
{
public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;
protected final String name;
protected final String fieldName;
protected final int size;
private final byte cacheId;
public static final Comparator<Sketch> COMPARATOR = new Comparator<Sketch>()
{
@Override
public int compare(Sketch o, Sketch o1)
{
return Doubles.compare(o.getEstimate(), o1.getEstimate());
}
};
public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId)
{
this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
this.fieldName = Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");
this.size = size == null ? DEFAULT_MAX_SKETCH_SIZE : size;
Util.checkIfPowerOf2(this.size, "size");
this.cacheId = cacheId;
}
@SuppressWarnings("unchecked")
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new EmptySketchAggregator(name);
} else {
return new SketchAggregator(name, selector, size);
}
}
@SuppressWarnings("unchecked")
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ObjectColumnSelector selector = metricFactory.makeObjectColumnSelector(fieldName);
if (selector == null) {
return new EmptySketchBufferAggregator();
} else {
return new SketchBufferAggregator(selector, size, getMaxIntermediateSize());
}
}
@Override
public Object deserialize(Object object)
{
return SketchOperations.deserialize(object);
}
@Override
public Comparator<Sketch> getComparator()
{
return COMPARATOR;
}
@Override
public Object combine(Object lhs, Object rhs)
{
Union union = (Union) SetOperation.builder().build(size, Family.UNION);
updateUnion(union, lhs);
updateUnion(union, rhs);
return union.getResult(false, null);
}
private void updateUnion(Union union, Object obj)
{
if (obj == null) {
return;
} else if (obj instanceof Memory) {
union.update((Memory) obj);
} else if (obj instanceof Sketch) {
union.update((Sketch) obj);
} else {
throw new IAE("Object of type [%s] can not be unioned", obj.getClass().getName());
}
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFieldName()
{
return fieldName;
}
@JsonProperty
public int getSize()
{
return size;
}
@Override
public int getMaxIntermediateSize()
{
return SetOperation.getMaxUnionBytes(size);
}
@Override
public Object getAggregatorStartValue()
{
return Sketches.updateSketchBuilder().build(size);
}
@Override
public List<String> requiredFields()
{
return Collections.singletonList(fieldName);
}
@Override
public byte[] getCacheKey()
{
byte[] fieldNameBytes = fieldName.getBytes();
return ByteBuffer.allocate(1 + fieldNameBytes.length).put(cacheId).put(fieldNameBytes).array();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{"
+ "fieldName='" + fieldName + '\''
+ ", name='" + name + '\''
+ ", size=" + size
+ '}';
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SketchAggregatorFactory that = (SketchAggregatorFactory) o;
if (size != that.size) {
return false;
}
if (cacheId != that.cacheId) {
return false;
}
if (!name.equals(that.name)) {
return false;
}
return fieldName.equals(that.fieldName);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fieldName.hashCode();
result = 31 * result + size;
result = 31 * result + (int) cacheId;
return result;
}
}

View File

@ -0,0 +1,126 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.memory.MemoryRegion;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.SetOpReturnState;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.query.aggregation.BufferAggregator;
import io.druid.segment.ObjectColumnSelector;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
public class SketchBufferAggregator implements BufferAggregator
{
private final ObjectColumnSelector selector;
private final int size;
private final int maxIntermediateSize;
private NativeMemory nm;
private final Map<Integer, Union> unions = new HashMap<>(); //position in BB -> Union Object
public SketchBufferAggregator(ObjectColumnSelector selector, int size, int maxIntermediateSize)
{
this.selector = selector;
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
}
@Override
public void init(ByteBuffer buf, int position)
{
if (nm == null) {
nm = new NativeMemory(buf);
}
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
unions.put(position, (Union) SetOperation.builder().setMemory(mem).build(size, Family.UNION));
}
@Override
public void aggregate(ByteBuffer buf, int position)
{
Object update = selector.get();
if(update == null) {
return;
}
Union union = getUnion(buf, position);
SetOpReturnState success;
if (update instanceof Memory) {
success = union.update((Memory) update);
} else {
success = union.update((Sketch) update);
}
if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch Buffer Aggregation failed with state " + update);
}
}
@Override
public Object get(ByteBuffer buf, int position)
{
//in the code below, I am returning SetOp.getResult(true, null)
//"true" returns an ordered sketch but slower to compute than unordered sketch.
//however, advantage of ordered sketch is that they are faster to "union" later
//given that results from the aggregator will be combined further, it is better
//to return the ordered sketch here
return getUnion(buf, position).getResult(true, null);
}
//Note that this is not threadsafe and I don't think it needs to be
private Union getUnion(ByteBuffer buf, int position)
{
Union union = unions.get(position);
if(union == null) {
Memory mem = new MemoryRegion(nm, position, maxIntermediateSize);
union = (Union) SetOperation.wrap(mem);
unions.put(position, union);
}
return union;
}
@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("Not implemented");
}
@Override
public void close() {
unions.clear();
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.AggregatorFactory;
import java.util.Arrays;
import java.util.List;
/**
*/
public class SketchBuildAggregatorFactory extends SketchAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 17;
@JsonCreator
public SketchBuildAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("size") Integer size
)
{
super(name, fieldName, size, CACHE_TYPE_ID);
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new SketchBuildAggregatorFactory(name, name, size);
}
@Override
public Object finalizeComputation(Object object)
{
return ((Sketch) object).getEstimate();
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Arrays.<AggregatorFactory>asList(new SketchBuildAggregatorFactory(fieldName, fieldName, size));
}
@Override
public String getTypeName()
{
return SketchModule.SKETCH_BUILD;
}
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.metamx.common.IAE;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.UpdateSketch;
import io.druid.data.input.InputRow;
import io.druid.segment.serde.ComplexMetricExtractor;
import java.util.List;
/**
*/
public class SketchBuildComplexMetricSerde extends SketchMergeComplexMetricSerde
{
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<?> extractedClass()
{
return Object.class;
}
@Override
public Object extractValue(InputRow inputRow, String metricName)
{
Object obj = inputRow.getRaw(metricName);
if (obj == null || obj instanceof Sketch || obj instanceof Memory) {
return obj;
}
UpdateSketch sketch = Sketches.updateSketchBuilder().build(4096);
if (obj instanceof String) {
sketch.update((String) obj);
} else if (obj instanceof byte[]) {
sketch.update((byte[]) obj);
} else if (obj instanceof Double) {
sketch.update(((Double) obj));
} else if (obj instanceof Integer || obj instanceof Long) {
sketch.update(((Number) obj).longValue());
} else if (obj instanceof int[]) {
sketch.update((int[]) obj);
} else if (obj instanceof long[]) {
sketch.update((long[]) obj);
} else if (obj instanceof List) {
for (Object entry : (List) obj) {
sketch.update(entry.toString());
}
} else {
throw new IAE("Unknown object type[%s] received for ingestion.", obj.getClass());
}
return sketch;
}
};
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.PostAggregator;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
public class SketchEstimatePostAggregator implements PostAggregator
{
private final String name;
private final PostAggregator field;
@JsonCreator
public SketchEstimatePostAggregator(
@JsonProperty("name") String name,
@JsonProperty("field") PostAggregator field
)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
}
@Override
public Set<String> getDependentFields()
{
Set<String> dependentFields = Sets.newHashSet();
dependentFields.addAll(field.getDependentFields());
return dependentFields;
}
@Override
public Comparator<Sketch> getComparator()
{
return SketchAggregatorFactory.COMPARATOR;
}
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
Sketch sketch = (Sketch) field.compute(combinedAggregators);
return sketch.getEstimate();
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@Override
public String toString()
{
return "SketchEstimatePostAggregator{" +
"name='" + name + '\'' +
", field=" + field +
"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SketchEstimatePostAggregator that = (SketchEstimatePostAggregator) o;
if (!name.equals(that.name)) {
return false;
}
return field.equals(that.field);
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + field.hashCode();
return result;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.yahoo.sketches.theta.Sketch;
import java.io.IOException;
public class SketchJsonSerializer extends JsonSerializer<Sketch>
{
@Override
public void serialize(Sketch sketch, JsonGenerator jgen, SerializerProvider provider) throws IOException, JsonProcessingException
{
jgen.writeBinary(sketch.toByteArray());
}
}

View File

@ -0,0 +1,127 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.AggregatorFactory;
import java.util.Collections;
import java.util.List;
public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
{
private static final byte CACHE_TYPE_ID = 15;
private final boolean shouldFinalize;
@JsonCreator
public SketchMergeAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("size") Integer size,
@JsonProperty("shouldFinalize") Boolean shouldFinalize
)
{
super(name, fieldName, size, CACHE_TYPE_ID);
this.shouldFinalize = (shouldFinalize == null) ? true : shouldFinalize.booleanValue();
}
@Override
public List<AggregatorFactory> getRequiredColumns()
{
return Collections.<AggregatorFactory>singletonList(new SketchMergeAggregatorFactory(fieldName, fieldName, size, shouldFinalize));
}
@Override
public AggregatorFactory getCombiningFactory()
{
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize);
}
@JsonProperty
public boolean getShouldFinalize()
{
return shouldFinalize;
}
/**
* Finalize the computation on sketch object and returns estimate from underlying
* sketch.
*
* @param object the sketch object
* @return sketch object
*/
@Override
public Object finalizeComputation(Object object)
{
if (shouldFinalize) {
return ((Sketch) object).getEstimate();
} else {
return object;
}
}
@Override
public String getTypeName()
{
return SketchModule.SET_SKETCH;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) o;
return shouldFinalize == that.shouldFinalize;
}
@Override
public int hashCode()
{
int result = super.hashCode();
result = 31 * result + (shouldFinalize ? 1 : 0);
return result;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{"
+ "fieldName='" + fieldName + '\''
+ ", name='" + name + '\''
+ ", size=" + size + '\''
+ ", shouldFinalize=" + shouldFinalize +
+ '}';
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.Sketch;
import io.druid.data.input.InputRow;
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
import java.nio.ByteBuffer;
public class SketchMergeComplexMetricSerde extends ComplexMetricSerde
{
private SketchObjectStrategy strategy = new SketchObjectStrategy();
@Override
public String getTypeName()
{
return SketchModule.SET_SKETCH;
}
@Override
public ComplexMetricExtractor getExtractor()
{
return new ComplexMetricExtractor()
{
@Override
public Class<?> extractedClass()
{
return Sketch.class;
}
@Override
public Object extractValue(InputRow inputRow, String metricName)
{
final Object object = inputRow.getRaw(metricName);
if (object == null || object instanceof Sketch || object instanceof Memory) {
return object;
}
return SketchOperations.deserialize(object);
}
};
}
@Override
public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
{
GenericIndexed<Sketch> ge = GenericIndexed.read(buffer, strategy);
builder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), ge));
return new ComplexColumnPartSerde(ge, getTypeName());
}
@Override
public ObjectStrategy<Sketch> getObjectStrategy()
{
return strategy;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.yahoo.sketches.theta.Sketch;
import io.druid.initialization.DruidModule;
import io.druid.segment.serde.ComplexMetrics;
import java.util.Arrays;
import java.util.List;
public class SketchModule implements DruidModule
{
public static final String SET_SKETCH = "setSketch";
public static final String SKETCH_BUILD = "sketchBuild";
@Override
public void configure(Binder binder)
{
//gives the extractor to ingest non-sketch input data
if (ComplexMetrics.getSerdeForType(SKETCH_BUILD) == null) {
ComplexMetrics.registerSerde(SKETCH_BUILD, new SketchBuildComplexMetricSerde());
}
//gives the extractor to ingest sketch input data
if (ComplexMetrics.getSerdeForType(SET_SKETCH) == null) {
ComplexMetrics.registerSerde(SET_SKETCH, new SketchMergeComplexMetricSerde());
}
}
@Override
public List<? extends Module> getJacksonModules()
{
return Arrays.<Module>asList(
new SimpleModule("SketchModule")
.registerSubtypes(
new NamedType(SketchBuildAggregatorFactory.class, "sketchBuild"),
new NamedType(SketchMergeAggregatorFactory.class, "sketchMerge"),
new NamedType(SketchEstimatePostAggregator.class, "sketchEstimate"),
new NamedType(SketchSetPostAggregator.class, "sketchSetOper")
)
.addSerializer(Sketch.class, new SketchJsonSerializer())
);
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.google.common.primitives.Longs;
import com.metamx.common.IAE;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.memory.MemoryRegion;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import io.druid.segment.data.ObjectStrategy;
import java.nio.ByteBuffer;
public class SketchObjectStrategy implements ObjectStrategy
{
private static final byte[] EMPTY_BYTES = new byte[]{};
private static final Sketch EMPTY_SKETCH = Sketches.updateSketchBuilder().build().compact(true, null);
@Override
public int compare(Object s1, Object s2)
{
if (s1 instanceof Sketch) {
if (s2 instanceof Sketch) {
return SketchAggregatorFactory.COMPARATOR.compare((Sketch) s1, (Sketch) s2);
} else {
return -1;
}
}
if (s1 instanceof Memory) {
if (s2 instanceof Memory) {
Memory s1Mem = (Memory) s1;
Memory s2Mem = (Memory) s2;
// We have two Ordered Compact sketches, so just compare their last entry if they have the size.
// This is to produce a deterministic ordering, though it might not match the actual estimate
// ordering, but that's ok because this comparator is only used by GenericIndexed
int retVal = Longs.compare(s1Mem.getCapacity(), s2Mem.getCapacity());
if (retVal == 0) {
retVal = Longs.compare(s1Mem.getLong(s1Mem.getCapacity() - 8), s2Mem.getLong(s2Mem.getCapacity() - 8));
}
return retVal;
} else {
return 1;
}
}
throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1);
}
@Override
public Class<? extends Sketch> getClazz()
{
return Sketch.class;
}
@Override
public Object fromByteBuffer(ByteBuffer buffer, int numBytes)
{
if (numBytes == 0) {
return EMPTY_SKETCH;
}
return new MemoryRegion(new NativeMemory(buffer), buffer.position(), numBytes);
}
@Override
public byte[] toBytes(Object obj)
{
if (obj instanceof Sketch) {
Sketch sketch = (Sketch) obj;
if (sketch.isEmpty()) {
return EMPTY_BYTES;
}
return sketch.toByteArray();
} else if (obj instanceof Memory) {
Memory mem = (Memory) obj;
byte[] retVal = new byte[(int) mem.getCapacity()];
mem.getByteArray(0, retVal, 0, (int) mem.getCapacity());
return retVal;
} else if (obj == null) {
return EMPTY_BYTES;
} else {
throw new IAE("Unknown class[%s], toString[%s]", obj.getClass(), obj);
}
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.google.common.base.Charsets;
import com.metamx.common.logger.Logger;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.AnotB;
import com.yahoo.sketches.theta.Intersection;
import com.yahoo.sketches.theta.SetOpReturnState;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import org.apache.commons.codec.binary.Base64;
public class SketchOperations
{
private static final Logger LOG = new Logger(SketchOperations.class);
public static final Sketch EMPTY_SKETCH = Sketches.updateSketchBuilder().build().compact(true, null);
public static enum Func
{
UNION,
INTERSECT,
NOT;
}
public static Sketch deserialize(Object serializedSketch)
{
if (serializedSketch instanceof String) {
return deserializeFromBase64EncodedString((String) serializedSketch);
} else if (serializedSketch instanceof byte[]) {
return deserializeFromByteArray((byte[]) serializedSketch);
} else if (serializedSketch instanceof Sketch) {
return (Sketch) serializedSketch;
}
throw new IllegalStateException(
"Object is not of a type that can deserialize to sketch: "
+ serializedSketch.getClass()
);
}
public static Sketch deserializeFromBase64EncodedString(String str)
{
return deserializeFromByteArray(
Base64.decodeBase64(
str.getBytes(Charsets.UTF_8)
)
);
}
public static Sketch deserializeFromByteArray(byte[] data)
{
NativeMemory mem = new NativeMemory(data);
if(Sketch.getSerializationVersion(mem) < 3) {
return Sketches.heapifySketch(mem);
} else {
return Sketches.wrapSketch(mem);
}
}
public static Sketch sketchSetOperation(Func func, int sketchSize, Sketch... sketches)
{
//in the code below, I am returning SetOp.getResult(false, null)
//"false" gets us an unordered sketch which is faster to build
//"true" returns an ordered sketch but slower to compute. advantage of ordered sketch
//is that they are faster to "union" later but given that this method is used in
//the final stages of query processing, ordered sketch would be of no use.
switch (func) {
case UNION:
Union union = (Union) SetOperation.builder().build(sketchSize, Family.UNION);
for(Sketch sketch : sketches) {
SetOpReturnState success = union.update(sketch);
if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch operation failed " + func);
}
}
return union.getResult(false, null);
case INTERSECT:
Intersection intersection = (Intersection) SetOperation.builder().build(sketchSize, Family.INTERSECTION);
for(Sketch sketch : sketches) {
SetOpReturnState success = intersection.update(sketch);
if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch operation failed " + func);
}
}
return intersection.getResult(false, null);
case NOT:
if(sketches.length < 2) {
throw new IllegalArgumentException("A-Not-B requires atleast 2 sketches");
}
Sketch result = sketches[0];
for (int i = 1; i < sketches.length; i++) {
AnotB anotb = (AnotB) SetOperation.builder().build(sketchSize, Family.A_NOT_B);
SetOpReturnState success = anotb.update(result, sketches[i]);
if(success != SetOpReturnState.Success) {
throw new IllegalStateException("Sketch operation failed " + func);
}
result = anotb.getResult(false, null);
}
return result;
default:
throw new IllegalArgumentException("Unknown sketch operation " + func);
}
}
}

View File

@ -0,0 +1,160 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import io.druid.query.aggregation.datasketches.SketchOperations.Func;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import com.metamx.common.IAE;
import com.metamx.common.logger.Logger;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.theta.Sketch;
import io.druid.query.aggregation.PostAggregator;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class SketchSetPostAggregator implements PostAggregator
{
private static final Logger LOG = new Logger(SketchSetPostAggregator.class);
private final String name;
private final List<PostAggregator> fields;
private final Func func;
private final int maxSketchSize;
@JsonCreator
public SketchSetPostAggregator(
@JsonProperty("name") String name,
@JsonProperty("func") String func,
@JsonProperty("size") Integer maxSize,
@JsonProperty("fields") List<PostAggregator> fields
)
{
this.name = name;
this.fields = fields;
this.func = SketchOperations.Func.valueOf(func);
this.maxSketchSize = maxSize == null ? SketchAggregatorFactory.DEFAULT_MAX_SKETCH_SIZE : maxSize;
Util.checkIfPowerOf2(this.maxSketchSize, "size");
if (fields.size() <= 1) {
throw new IAE("Illegal number of fields[%s], must be > 1", fields.size());
}
}
@Override
public Set<String> getDependentFields()
{
Set<String> dependentFields = Sets.newLinkedHashSet();
for (PostAggregator field : fields) {
dependentFields.addAll(field.getDependentFields());
}
return dependentFields;
}
@Override
public Comparator<Sketch> getComparator()
{
return SketchAggregatorFactory.COMPARATOR;
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
Sketch[] sketches = new Sketch[fields.size()];
for (int i = 0; i < sketches.length; i++) {
sketches[i] = (Sketch) fields.get(i).compute(combinedAggregators);
}
return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches);
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public String getFunc()
{
return func.toString();
}
@JsonProperty
public List<PostAggregator> getFields()
{
return fields;
}
@JsonProperty
public int getSize()
{
return maxSketchSize;
}
@Override
public String toString()
{
return "SketchSetPostAggregator{" + "name='" + name + '\'' + ", fields=" + fields + ", func=" + func + ", size=" + maxSketchSize +"}";
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SketchSetPostAggregator that = (SketchSetPostAggregator) o;
if (maxSketchSize != that.maxSketchSize) {
return false;
}
if (!name.equals(that.name)) {
return false;
}
if (!fields.equals(that.fields)) {
return false;
}
return func == that.func;
}
@Override
public int hashCode()
{
int result = name.hashCode();
result = 31 * result + fields.hashCode();
result = 31 * result + func.hashCode();
result = 31 * result + maxSketchSize;
return result;
}
}

View File

@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.yahoo.sketches.memory.Memory;
import com.yahoo.sketches.theta.CompactSketch;
import com.yahoo.sketches.theta.SetOpReturnState;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
/**
*/
public class SynchronizedUnion implements Union
{
private final Union delegate;
public SynchronizedUnion(Union delegate)
{
this.delegate = delegate;
}
@Override
public synchronized SetOpReturnState update(Sketch sketch)
{
return delegate.update(sketch);
}
@Override
public synchronized SetOpReturnState update(Memory memory)
{
return delegate.update(memory);
}
@Override
public synchronized CompactSketch getResult(boolean b, Memory memory)
{
return delegate.getResult(b, memory);
}
@Override
public synchronized byte[] toByteArray()
{
return delegate.toByteArray();
}
@Override
public synchronized void reset()
{
delegate.reset();
}
}

View File

@ -0,0 +1 @@
io.druid.query.aggregation.datasketches.SketchModule

View File

@ -0,0 +1,209 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import io.druid.data.input.MapBasedRow;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
*/
public class SketchAggregationTest
{
private final AggregationTestHelper helper;
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
public SketchAggregationTest()
{
SketchModule sm = new SketchModule();
sm.configure(null);
helper = new AggregationTestHelper(sm.getJacksonModules(), tempFolder);
}
@Test
public void testSimpleDataIngestAndQuery() throws Exception
{
Sequence seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
readFileFromClasspathAsString("simple_test_data_record_parser.json"),
readFileFromClasspathAsString("simple_test_data_aggregators.json"),
0,
QueryGranularity.NONE,
5,
readFileFromClasspathAsString("simple_test_data_group_by_query.json")
);
List results = Sequences.toList(seq, Lists.newArrayList());
Assert.assertEquals(1, results.size());
Assert.assertEquals(
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("sketch_count", 50.0)
.put("sketchEstimatePostAgg", 50.0)
.put("sketchUnionPostAggEstimate", 50.0)
.put("sketchIntersectionPostAggEstimate", 50.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
results.get(0)
);
}
@Test
public void testSketchDataIngestAndQuery() throws Exception
{
Sequence seq = helper.createIndexAndRunQueryOnSegment(
new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()),
readFileFromClasspathAsString("sketch_test_data_record_parser.json"),
readFileFromClasspathAsString("sketch_test_data_aggregators.json"),
0,
QueryGranularity.NONE,
5,
readFileFromClasspathAsString("sketch_test_data_group_by_query.json")
);
List results = Sequences.toList(seq, Lists.newArrayList());
Assert.assertEquals(1, results.size());
Assert.assertEquals(
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("sids_sketch_count", 50.0)
.put("sketchEstimatePostAgg", 50.0)
.put("sketchUnionPostAggEstimate", 50.0)
.put("sketchIntersectionPostAggEstimate", 50.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
results.get(0)
);
}
@Test
public void testSketchMergeAggregatorFactorySerde() throws Exception
{
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, null));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, false));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true));
}
@Test
public void testSketchMergeFinalization() throws Exception
{
Sketch sketch = Sketches.updateSketchBuilder().build(128);
SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);
agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);
agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, false);
Assert.assertEquals(sketch, agg.finalizeComputation(sketch));
}
@Test
public void testSketchBuildAggregatorFactorySerde() throws Exception
{
assertAggregatorFactorySerde(new SketchBuildAggregatorFactory("name", "fieldName", 16));
}
private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception{
Assert.assertEquals(
agg,
helper.getObjectMapper().readValue(
helper.getObjectMapper().writeValueAsString(agg),
AggregatorFactory.class
)
);
}
@Test
public void testSketchEstimatePostAggregatorSerde() throws Exception
{
assertPostAggregatorSerde(
new SketchEstimatePostAggregator(
"name",
new FieldAccessPostAggregator("name", "fieldName")
)
);
}
@Test
public void testSketchSetPostAggregatorSerde() throws Exception
{
assertPostAggregatorSerde(
new SketchSetPostAggregator(
"name",
"INTERSECT",
null,
Lists.<PostAggregator>newArrayList(
new FieldAccessPostAggregator("name1", "fieldName1"),
new FieldAccessPostAggregator("name2", "fieldName2")
)
)
);
}
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception{
Assert.assertEquals(
agg,
helper.getObjectMapper().readValue(
helper.getObjectMapper().writeValueAsString(agg),
PostAggregator.class
)
);
}
public final static String readFileFromClasspathAsString(String fileName) throws IOException
{
return Files.asCharSource(
new File(SketchAggregationTest.class.getClassLoader().getResource(fileName).getFile()),
Charset.forName("UTF-8")
).read();
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,12 @@
[
{
"type": "sketchBuild",
"name": "pty_country",
"fieldName": "pty_country"
},
{
"type": "sketchBuild",
"name": "non_existing_col_validation",
"fieldName": "non_existing_col"
}
]

View File

@ -0,0 +1,84 @@
{
"queryType": "groupBy",
"dataSource": "test_datasource",
"granularity": "ALL",
"dimensions": [],
"aggregations": [
{ "type": "sketchMerge", "name": "sketch_count", "fieldName": "pty_country", "size": 16384 },
{ "type": "sketchMerge", "name": "non_existing_col_validation", "fieldName": "non_existing_col", "size": 16384 }
],
"postAggregations": [
{
"type": "sketchEstimate",
"name": "sketchEstimatePostAgg",
"field": {
"type": "fieldAccess",
"fieldName": "sketch_count"
}
},
{
"type": "sketchEstimate",
"name": "sketchIntersectionPostAggEstimate",
"field":
{
"type": "sketchSetOper",
"name": "sketchIntersectionPostAgg",
"func": "INTERSECT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
},
{
"type": "sketchEstimate",
"name": "sketchAnotBPostAggEstimate",
"field":
{
"type": "sketchSetOper",
"name": "sketchAnotBUnionPostAgg",
"func": "NOT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
},
{
"type": "sketchEstimate",
"name": "sketchUnionPostAggEstimate",
"field":
{
"type": "sketchSetOper",
"name": "sketchUnionPostAgg",
"func": "UNION",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
}
],
"intervals": [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ]
}

View File

@ -0,0 +1,16 @@
{
"type" : "string",
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "timestamp",
"format" : "yyyyMMddHH"
},
"dimensionsSpec" : {
"dimensions": ["product"],
"dimensionExclusions" : [],
"spatialDimensions" : []
},
"columns": ["timestamp", "product", "pty_country"]
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,14 @@
[
{
"type": "sketchMerge",
"name": "sids_sketch",
"fieldName": "sketch",
"size": 16384
},
{
"type": "sketchMerge",
"name": "non_existing_col_validation",
"fieldName": "non_existing_col",
"size": 16384
}
]

View File

@ -0,0 +1,83 @@
{
"queryType": "groupBy",
"dataSource": "test_datasource",
"granularity": "ALL",
"dimensions": [],
"aggregations": [
{ "type": "sketchMerge", "name": "sids_sketch_count", "fieldName": "sids_sketch", "size": 16384 },
{ "type": "sketchMerge", "name": "non_existing_col_validation", "fieldName": "non_existing_col", "size": 16384 }
],
"postAggregations": [
{
"type": "sketchEstimate",
"name": "sketchEstimatePostAgg",
"field": {
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
}
},
{
"type": "sketchEstimate",
"name": "sketchIntersectionPostAggEstimate",
"field":
{
"type": "sketchSetOper",
"name": "sketchIntersectionPostAgg",
"func": "INTERSECT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
}
]
}
},
{
"type": "sketchEstimate",
"name": "sketchAnotBPostAggEstimate",
"field":
{
"type": "sketchSetOper",
"name": "sketchAnotBUnionPostAgg",
"func": "NOT",
"fields": [
{
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
}
]
}
},
{
"type": "sketchEstimate",
"name": "sketchUnionPostAggEstimate",
"field":
{
"type": "sketchSetOper",
"name": "sketchUnionPostAgg",
"func": "UNION",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sids_sketch_count"
}
]
}
}
],
"intervals": [ "2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z" ]
}

View File

@ -0,0 +1,16 @@
{
"type" : "string",
"parseSpec" : {
"format" : "tsv",
"timestampSpec" : {
"column" : "timestamp",
"format" : "yyyyMMddHH"
},
"dimensionsSpec" : {
"dimensions": ["product"],
"dimensionExclusions" : [],
"spatialDimensions" : []
},
"columns": ["timestamp", "product", "sketch"]
}
}