fix SketchMergeAggregatorFactory.finalizeResults, comparator and more UTs for timeseries, topN (#3613)

This commit is contained in:
Himanshu 2016-10-28 17:48:33 -05:00 committed by Fangjin Yang
parent 6a845e1f7b
commit 23a8e22836
11 changed files with 624 additions and 147 deletions

View File

@ -21,6 +21,7 @@ package io.druid.query.aggregation.datasketches.theta;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Doubles;
import com.google.common.primitives.Ints;
import com.yahoo.sketches.Family;
@ -30,7 +31,6 @@ import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.Aggregator;
import io.druid.query.aggregation.AggregatorFactory;
@ -52,14 +52,18 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
protected final int size;
private final byte cacheId;
public static final Comparator<Sketch> COMPARATOR = new Comparator<Sketch>()
{
@Override
public int compare(Sketch o, Sketch o1)
{
return Doubles.compare(o.getEstimate(), o1.getEstimate());
}
};
public static final Comparator<Object> COMPARATOR = Ordering.from(
new Comparator()
{
@Override
public int compare(Object o1, Object o2)
{
Sketch s1 = SketchAggregatorFactory.toSketch(o1);
Sketch s2 = SketchAggregatorFactory.toSketch(o2);
return Doubles.compare(s1.getEstimate(), s2.getEstimate());
}
}
).nullsFirst();
public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId)
{
@ -103,7 +107,7 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
}
@Override
public Comparator<Sketch> getComparator()
public Comparator<Object> getComparator()
{
return COMPARATOR;
}
@ -191,6 +195,17 @@ public abstract class SketchAggregatorFactory extends AggregatorFactory
.array();
}
public final static Sketch toSketch(Object obj)
{
if (obj instanceof Sketch) {
return (Sketch) obj;
} else if (obj instanceof Union) {
return ((Union) obj).getResult(true, null);
} else {
throw new IAE("Can't convert to Sketch object [%s]", obj.getClass());
}
}
@Override
public String toString()
{

View File

@ -82,7 +82,7 @@ public class SketchEstimatePostAggregator implements PostAggregator
@Override
public Object compute(Map<String, Object> combinedAggregators)
{
Sketch sketch = SketchSetPostAggregator.toSketch(field.compute(combinedAggregators));
Sketch sketch = SketchAggregatorFactory.toSketch(field.compute(combinedAggregators));
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(),

View File

@ -123,7 +123,7 @@ public class SketchMergeAggregatorFactory extends SketchAggregatorFactory
public Object finalizeComputation(Object object)
{
if (shouldFinalize) {
Sketch sketch = (Sketch) object;
Sketch sketch = SketchAggregatorFactory.toSketch(object);
if (errorBoundsStdDev != null) {
SketchEstimateWithErrorBounds result = new SketchEstimateWithErrorBounds(
sketch.getEstimate(),

View File

@ -26,7 +26,6 @@ import com.yahoo.sketches.memory.NativeMemory;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.segment.data.ObjectStrategy;
@ -41,13 +40,14 @@ public class SketchObjectStrategy implements ObjectStrategy
@Override
public int compare(Object s1, Object s2)
{
if (s1 instanceof Sketch) {
if (s2 instanceof Sketch) {
return SketchAggregatorFactory.COMPARATOR.compare((Sketch) s1, (Sketch) s2);
if (s1 instanceof Sketch || s1 instanceof Union) {
if (s2 instanceof Sketch || s2 instanceof Union) {
return SketchAggregatorFactory.COMPARATOR.compare(s1, s2);
} else {
return -1;
}
}
if (s1 instanceof Memory) {
if (s2 instanceof Memory) {
Memory s1Mem = (Memory) s1;
@ -66,6 +66,7 @@ public class SketchObjectStrategy implements ObjectStrategy
return 1;
}
}
throw new IAE("Unknwon class[%s], toString[%s]", s1.getClass(), s1);
}

View File

@ -56,6 +56,8 @@ public class SketchOperations
return deserializeFromByteArray((byte[]) serializedSketch);
} else if (serializedSketch instanceof Sketch) {
return (Sketch) serializedSketch;
} else if (serializedSketch instanceof Union) {
return ((Union) serializedSketch).getResult(true, null);
}
throw new IllegalStateException(

View File

@ -24,8 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Sets;
import com.yahoo.sketches.Util;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Union;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.PostAggregator;
@ -75,7 +73,7 @@ public class SketchSetPostAggregator implements PostAggregator
}
@Override
public Comparator<Sketch> getComparator()
public Comparator<Object> getComparator()
{
return SketchAggregatorFactory.COMPARATOR;
}
@ -85,23 +83,12 @@ public class SketchSetPostAggregator implements PostAggregator
{
Sketch[] sketches = new Sketch[fields.size()];
for (int i = 0; i < sketches.length; i++) {
sketches[i] = toSketch(fields.get(i).compute(combinedAggregators));
sketches[i] = SketchAggregatorFactory.toSketch(fields.get(i).compute(combinedAggregators));
}
return SketchOperations.sketchSetOperation(func, maxSketchSize, sketches);
}
public final static Sketch toSketch(Object obj)
{
if (obj instanceof Sketch) {
return (Sketch) obj;
} else if (obj instanceof Union) {
return ((Union) obj).getResult(true, null);
} else {
throw new IAE("Can't convert to Sketch object [%s]", obj.getClass());
}
}
@Override
@JsonProperty
public String getName()

View File

@ -21,22 +21,22 @@ package io.druid.query.aggregation.datasketches.theta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import com.yahoo.sketches.Family;
import com.yahoo.sketches.theta.SetOperation;
import com.yahoo.sketches.theta.Sketch;
import com.yahoo.sketches.theta.Sketches;
import com.yahoo.sketches.theta.Union;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
import io.druid.query.select.SelectResultValue;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
@ -47,6 +47,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
/**
@ -65,119 +66,6 @@ public class SketchAggregationTest
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(sm.getJacksonModules(), tempFolder);
}
@Test
public void testSimpleDataIngestAndGpByQuery() throws Exception
{
Sequence<Row> seq = helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
readFileFromClasspathAsString("simple_test_data_record_parser.json"),
readFileFromClasspathAsString("simple_test_data_aggregators.json"),
0,
QueryGranularities.NONE,
5,
readFileFromClasspathAsString("simple_test_data_group_by_query.json")
);
List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList());
Assert.assertEquals(5, results.size());
Assert.assertEquals(
ImmutableList.of(
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_3")
.put("sketch_count", 38.0)
.put("sketchEstimatePostAgg", 38.0)
.put("sketchUnionPostAggEstimate", 38.0)
.put("sketchIntersectionPostAggEstimate", 38.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_1")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_2")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_4")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_5")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
)
),
results
);
}
@Test
public void testSimpleDataIngestAndSelectQuery() throws Exception
{
SketchModule sm = new SketchModule();
sm.configure(null);
AggregationTestHelper selectQueryAggregationTestHelper = AggregationTestHelper.createSelectQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
Sequence seq = selectQueryAggregationTestHelper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
readFileFromClasspathAsString("simple_test_data_record_parser.json"),
readFileFromClasspathAsString("simple_test_data_aggregators.json"),
0,
QueryGranularities.NONE,
5000,
readFileFromClasspathAsString("select_query.json")
);
Result<SelectResultValue> result = (Result<SelectResultValue>) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList()));
Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp());
Assert.assertEquals(100, result.getValue().getEvents().size());
Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country"));
}
@Test
public void testSketchDataIngestAndGpByQuery() throws Exception
{
@ -453,6 +341,34 @@ public class SketchAggregationTest
);
}
@Test
public void testSketchAggregatorFactoryComparator()
{
Comparator<Object> comparator = SketchAggregatorFactory.COMPARATOR;
Assert.assertEquals(0, comparator.compare(null, null));
Union union1 = (Union) SetOperation.builder().build(1<<4, Family.UNION);
union1.update("a");
union1.update("b");
Sketch sketch1 = union1.getResult();
Assert.assertEquals(-1, comparator.compare(null, sketch1));
Assert.assertEquals(1, comparator.compare(sketch1, null));
Union union2 = (Union) SetOperation.builder().build(1<<4, Family.UNION);
union2.update("a");
union2.update("b");
union2.update("c");
Sketch sketch2 = union2.getResult();
Assert.assertEquals(-1, comparator.compare(sketch1, sketch2));
Assert.assertEquals(-1, comparator.compare(sketch1, union2));
Assert.assertEquals(1, comparator.compare(sketch2, sketch1));
Assert.assertEquals(1, comparator.compare(sketch2, union1));
Assert.assertEquals(1, comparator.compare(union2, union1));
Assert.assertEquals(1, comparator.compare(union2, sketch1));
}
private void assertPostAggregatorSerde(PostAggregator agg) throws Exception
{
Assert.assertEquals(

View File

@ -0,0 +1,267 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.query.aggregation.datasketches.theta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularities;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregationTestHelper;
import io.druid.query.select.SelectResultValue;
import io.druid.query.timeseries.TimeseriesResultValue;
import io.druid.query.topn.DimensionAndMetricValueExtractor;
import io.druid.query.topn.TopNResultValue;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
/**
*/
public class SketchAggregationTestWithSimpleData
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
private SketchModule sm;
private File s1;
private File s2;
@Before
public void setup() throws Exception
{
sm = new SketchModule();
sm.configure(null);
AggregationTestHelper toolchest = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
s1 = tempFolder.newFolder();
toolchest.createIndex(
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
readFileFromClasspathAsString("simple_test_data_record_parser.json"),
readFileFromClasspathAsString("simple_test_data_aggregators.json"),
s1,
0,
QueryGranularities.NONE,
5000
);
s2 = tempFolder.newFolder();
toolchest.createIndex(
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
readFileFromClasspathAsString("simple_test_data_record_parser.json"),
readFileFromClasspathAsString("simple_test_data_aggregators.json"),
s2,
0,
QueryGranularities.NONE,
5000
);
}
@Test
public void testSimpleDataIngestAndGpByQuery() throws Exception
{
AggregationTestHelper gpByQueryAggregationTestHelper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
Sequence seq = gpByQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
readFileFromClasspathAsString("simple_test_data_group_by_query.json")
);
List<Row> results = Sequences.toList(seq, Lists.<Row>newArrayList());
Assert.assertEquals(5, results.size());
Assert.assertEquals(
ImmutableList.of(
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_3")
.put("sketch_count", 38.0)
.put("sketchEstimatePostAgg", 38.0)
.put("sketchUnionPostAggEstimate", 38.0)
.put("sketchIntersectionPostAggEstimate", 38.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_1")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_2")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_4")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
),
new MapBasedRow(
DateTime.parse("2014-10-19T00:00:00.000Z"),
ImmutableMap
.<String, Object>builder()
.put("product", "product_5")
.put("sketch_count", 42.0)
.put("sketchEstimatePostAgg", 42.0)
.put("sketchUnionPostAggEstimate", 42.0)
.put("sketchIntersectionPostAggEstimate", 42.0)
.put("sketchAnotBPostAggEstimate", 0.0)
.put("non_existing_col_validation", 0.0)
.build()
)
),
results
);
}
@Test
public void testSimpleDataIngestAndTimeseriesQuery() throws Exception
{
AggregationTestHelper timeseriesQueryAggregationTestHelper = AggregationTestHelper.createTimeseriesQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
Sequence seq = timeseriesQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
readFileFromClasspathAsString("timeseries_query.json")
);
Result<TimeseriesResultValue> result = (Result<TimeseriesResultValue>) Iterables.getOnlyElement(
Sequences.toList(seq, Lists.newArrayList())
);
Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp());
Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketch_count"), 0.01);
Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketchEstimatePostAgg"), 0.01);
Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketchUnionPostAggEstimate"), 0.01);
Assert.assertEquals(50.0, result.getValue().getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01);
Assert.assertEquals(0.0, result.getValue().getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01);
Assert.assertEquals(0.0, result.getValue().getDoubleMetric("non_existing_col_validation"), 0.01);
}
@Test
public void testSimpleDataIngestAndTopNQuery() throws Exception
{
AggregationTestHelper topNQueryAggregationTestHelper = AggregationTestHelper.createTopNQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
Sequence seq = topNQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
readFileFromClasspathAsString("topn_query.json")
);
Result<TopNResultValue> result = (Result<TopNResultValue>) Iterables.getOnlyElement(
Sequences.toList(seq, Lists.newArrayList())
);
Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp());
DimensionAndMetricValueExtractor value = Iterables.getOnlyElement(result.getValue().getValue());
Assert.assertEquals(38.0, value.getDoubleMetric("sketch_count"), 0.01);
Assert.assertEquals(38.0, value.getDoubleMetric("sketchEstimatePostAgg"), 0.01);
Assert.assertEquals(38.0, value.getDoubleMetric("sketchUnionPostAggEstimate"), 0.01);
Assert.assertEquals(38.0, value.getDoubleMetric("sketchIntersectionPostAggEstimate"), 0.01);
Assert.assertEquals(0.0, value.getDoubleMetric("sketchAnotBPostAggEstimate"), 0.01);
Assert.assertEquals(0.0, value.getDoubleMetric("non_existing_col_validation"), 0.01);
Assert.assertEquals("product_3", value.getDimensionValue("product"));
}
@Test
public void testSimpleDataIngestAndSelectQuery() throws Exception
{
SketchModule sm = new SketchModule();
sm.configure(null);
AggregationTestHelper selectQueryAggregationTestHelper = AggregationTestHelper.createSelectQueryAggregationTestHelper(
sm.getJacksonModules(),
tempFolder
);
Sequence seq = selectQueryAggregationTestHelper.runQueryOnSegments(
ImmutableList.of(s1, s2),
readFileFromClasspathAsString("select_query.json")
);
Result<SelectResultValue> result = (Result<SelectResultValue>) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList()));
Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp());
Assert.assertEquals(100, result.getValue().getEvents().size());
Assert.assertEquals("AgMDAAAazJMCAAAAAACAPzz9j7pWTMdROWGf15uY1nI=", result.getValue().getEvents().get(0).getEvent().get("pty_country"));
}
public final static String readFileFromClasspathAsString(String fileName) throws IOException
{
return Files.asCharSource(
new File(SketchAggregationTest.class.getClassLoader().getResource(fileName).getFile()),
Charset.forName("UTF-8")
).read();
}
}

View File

@ -0,0 +1,92 @@
{
"queryType": "timeseries",
"dataSource": "test_datasource",
"granularity":"ALL",
"aggregations": [
{
"type": "thetaSketch",
"name": "sketch_count",
"fieldName": "pty_country",
"size": 16384
},
{
"type": "thetaSketch",
"name": "non_existing_col_validation",
"fieldName": "non_existing_col",
"size": 16384
}
],
"postAggregations": [
{
"type": "thetaSketchEstimate",
"name": "sketchEstimatePostAgg",
"field": {
"type": "fieldAccess",
"fieldName": "sketch_count"
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchIntersectionPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchIntersectionPostAgg",
"func": "INTERSECT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchAnotBPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchAnotBUnionPostAgg",
"func": "NOT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchUnionPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchUnionPostAgg",
"func": "UNION",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
}
],
"intervals": [
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
]
}

View File

@ -0,0 +1,98 @@
{
"queryType": "topN",
"dataSource": "test_datasource",
"granularity":"ALL",
"metric": {
"type": "inverted",
"metric": "sketch_count"
},
"dimension": "product",
"threshold": 1,
"aggregations": [
{
"type": "thetaSketch",
"name": "sketch_count",
"fieldName": "pty_country",
"size": 16384
},
{
"type": "thetaSketch",
"name": "non_existing_col_validation",
"fieldName": "non_existing_col",
"size": 16384
}
],
"postAggregations": [
{
"type": "thetaSketchEstimate",
"name": "sketchEstimatePostAgg",
"field": {
"type": "fieldAccess",
"fieldName": "sketch_count"
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchIntersectionPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchIntersectionPostAgg",
"func": "INTERSECT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchAnotBPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchAnotBUnionPostAgg",
"func": "NOT",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
},
{
"type": "thetaSketchEstimate",
"name": "sketchUnionPostAggEstimate",
"field": {
"type": "thetaSketchSetOp",
"name": "sketchUnionPostAgg",
"func": "UNION",
"size": 16384,
"fields": [
{
"type": "fieldAccess",
"fieldName": "sketch_count"
},
{
"type": "fieldAccess",
"fieldName": "sketch_count"
}
]
}
}
],
"intervals": [
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
]
}

View File

@ -28,10 +28,12 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import io.druid.collections.StupidPool;
import io.druid.data.input.Row;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.StringInputRowParser;
@ -57,6 +59,12 @@ import io.druid.query.groupby.GroupByQueryRunnerTest;
import io.druid.query.select.SelectQueryEngine;
import io.druid.query.select.SelectQueryQueryToolChest;
import io.druid.query.select.SelectQueryRunnerFactory;
import io.druid.query.timeseries.TimeseriesQueryEngine;
import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import io.druid.query.topn.TopNQueryConfig;
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNQueryRunnerFactory;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
@ -74,6 +82,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@ -192,6 +201,96 @@ public class AggregationTestHelper
);
}
public static final AggregationTestHelper createTimeseriesQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
{
ObjectMapper mapper = new DefaultObjectMapper();
TimeseriesQueryQueryToolChest toolchest = new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
toolchest,
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
IndexIO indexIO = new IndexIO(
mapper,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
return new AggregationTestHelper(
mapper,
new IndexMerger(mapper, indexIO),
indexIO,
toolchest,
factory,
tempFolder,
jsonModulesToRegister
);
}
public static final AggregationTestHelper createTopNQueryAggregationTestHelper(
List<? extends Module> jsonModulesToRegister,
TemporaryFolder tempFolder
)
{
ObjectMapper mapper = new DefaultObjectMapper();
TopNQueryQueryToolChest toolchest = new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
);
TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory(
new StupidPool<>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(10*1024*1024);
}
}
),
toolchest,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
IndexIO indexIO = new IndexIO(
mapper,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
);
return new AggregationTestHelper(
mapper,
new IndexMerger(mapper, indexIO),
indexIO,
toolchest,
factory,
tempFolder,
jsonModulesToRegister
);
}
public Sequence<Row> createIndexAndRunQueryOnSegment(
File inputDataFile,
String parserJson,