mirror of https://github.com/apache/druid.git
Merge pull request #2207 from himanshug/theta_sketch_select_query
fix bug for thetaSketch metric not working with select queries
This commit is contained in:
commit
aaea95ed1b
|
@ -0,0 +1,40 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MemoryJsonSerializer extends JsonSerializer<Memory>
|
||||
{
|
||||
@Override
|
||||
public void serialize(Memory mem, JsonGenerator jgen, SerializerProvider provider)
|
||||
throws IOException, JsonProcessingException
|
||||
{
|
||||
jgen.writeBinary(SketchOperations.deserializeFromMemory(mem).toByteArray());
|
||||
}
|
||||
}
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.Module;
|
|||
import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.inject.Binder;
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.theta.Sketch;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.segment.serde.ComplexMetrics;
|
||||
|
@ -66,7 +67,11 @@ public class SketchModule implements DruidModule
|
|||
new NamedType(SketchEstimatePostAggregator.class, THETA_SKETCH_ESTIMATE_POST_AGG),
|
||||
new NamedType(SketchSetPostAggregator.class, THETA_SKETCH_SET_OP_POST_AGG)
|
||||
)
|
||||
.addSerializer(Sketch.class, new SketchJsonSerializer())
|
||||
.addSerializer(
|
||||
Sketch.class, new SketchJsonSerializer()
|
||||
)
|
||||
.addSerializer(
|
||||
Memory.class, new MemoryJsonSerializer())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.query.aggregation.datasketches.theta;
|
|||
import com.google.common.base.Charsets;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import com.yahoo.sketches.Family;
|
||||
import com.yahoo.sketches.memory.Memory;
|
||||
import com.yahoo.sketches.memory.NativeMemory;
|
||||
import com.yahoo.sketches.theta.AnotB;
|
||||
import com.yahoo.sketches.theta.Intersection;
|
||||
|
@ -72,7 +73,11 @@ public class SketchOperations
|
|||
|
||||
public static Sketch deserializeFromByteArray(byte[] data)
|
||||
{
|
||||
NativeMemory mem = new NativeMemory(data);
|
||||
return deserializeFromMemory(new NativeMemory(data));
|
||||
}
|
||||
|
||||
public static Sketch deserializeFromMemory(Memory mem)
|
||||
{
|
||||
if (Sketch.getSerializationVersion(mem) < 3) {
|
||||
return Sketches.heapifySketch(mem);
|
||||
} else {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package io.druid.query.aggregation.datasketches.theta;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.io.Files;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
|
@ -28,10 +29,12 @@ import com.yahoo.sketches.theta.Sketch;
|
|||
import com.yahoo.sketches.theta.Sketches;
|
||||
import io.druid.data.input.MapBasedRow;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregationTestHelper;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.query.aggregation.post.FieldAccessPostAggregator;
|
||||
import io.druid.query.select.SelectResultValue;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
|
@ -56,11 +59,11 @@ public class SketchAggregationTest
|
|||
{
|
||||
SketchModule sm = new SketchModule();
|
||||
sm.configure(null);
|
||||
helper = new AggregationTestHelper(sm.getJacksonModules(), tempFolder);
|
||||
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(sm.getJacksonModules(), tempFolder);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSimpleDataIngestAndQuery() throws Exception
|
||||
public void testSimpleDataIngestAndGpByQuery() throws Exception
|
||||
{
|
||||
Sequence seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
|
||||
|
@ -92,7 +95,33 @@ public class SketchAggregationTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testSketchDataIngestAndQuery() throws Exception
|
||||
public void testSimpleDataIngestAndSelectQuery() throws Exception
|
||||
{
|
||||
SketchModule sm = new SketchModule();
|
||||
sm.configure(null);
|
||||
AggregationTestHelper selectQueryAggregationTestHelper = AggregationTestHelper.createSelectQueryAggregationTestHelper(
|
||||
sm.getJacksonModules(),
|
||||
tempFolder
|
||||
);
|
||||
|
||||
Sequence seq = selectQueryAggregationTestHelper.createIndexAndRunQueryOnSegment(
|
||||
new File(this.getClass().getClassLoader().getResource("simple_test_data.tsv").getFile()),
|
||||
readFileFromClasspathAsString("simple_test_data_record_parser.json"),
|
||||
readFileFromClasspathAsString("simple_test_data_aggregators.json"),
|
||||
0,
|
||||
QueryGranularity.NONE,
|
||||
5000,
|
||||
readFileFromClasspathAsString("select_query.json")
|
||||
);
|
||||
|
||||
Result<SelectResultValue> result = (Result<SelectResultValue>) Iterables.getOnlyElement(Sequences.toList(seq, Lists.newArrayList()));
|
||||
Assert.assertEquals(new DateTime("2014-10-20T00:00:00.000Z"), result.getTimestamp());
|
||||
Assert.assertEquals(100, result.getValue().getEvents().size());
|
||||
Assert.assertEquals("AgMDAAAazJMBAAAAAACAPzz9j7pWTMdR", result.getValue().getEvents().get(0).getEvent().get("pty_country"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSketchDataIngestAndGpByQuery() throws Exception
|
||||
{
|
||||
Sequence seq = helper.createIndexAndRunQueryOnSegment(
|
||||
new File(SketchAggregationTest.class.getClassLoader().getResource("sketch_test_data.tsv").getFile()),
|
||||
|
|
|
@ -55,7 +55,7 @@ public class OldApiSketchAggregationTest
|
|||
OldApiSketchModule sm = new OldApiSketchModule();
|
||||
sm.configure(null);
|
||||
|
||||
helper = new AggregationTestHelper(
|
||||
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
sm.getJacksonModules(),
|
||||
tempFolder
|
||||
);
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"queryType": "select",
|
||||
"dataSource": "test_datasource",
|
||||
"dimensions":[],
|
||||
"metrics":[],
|
||||
"granularity": "ALL",
|
||||
"intervals": [
|
||||
"2014-10-19T00:00:00.000Z/2014-10-22T00:00:00.000Z"
|
||||
],
|
||||
"pagingSpec":{"pagingIdentifiers": {}, "threshold":100}
|
||||
}
|
|
@ -43,7 +43,10 @@ public class ApproximateHistogramAggregationTest
|
|||
{
|
||||
ApproximateHistogramDruidModule module = new ApproximateHistogramDruidModule();
|
||||
module.configure(null);
|
||||
helper = new AggregationTestHelper(Lists.newArrayList(module.getJacksonModules()), tempFolder);
|
||||
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
Lists.newArrayList(module.getJacksonModules()),
|
||||
tempFolder
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -72,7 +72,7 @@ public class MultiValuedDimensionTest
|
|||
|
||||
public MultiValuedDimensionTest() throws Exception
|
||||
{
|
||||
helper = new AggregationTestHelper(
|
||||
helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
ImmutableList.<Module>of(), null
|
||||
);
|
||||
}
|
||||
|
|
|
@ -19,9 +19,14 @@
|
|||
|
||||
package io.druid.query.aggregation;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonToken;
|
||||
import com.fasterxml.jackson.core.ObjectCodec;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
|
@ -30,6 +35,7 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.guava.CloseQuietly;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
@ -46,13 +52,17 @@ import io.druid.query.FinalizeResultsQueryRunner;
|
|||
import io.druid.query.IntervalChunkingQueryRunnerDecorator;
|
||||
import io.druid.query.Query;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerFactory;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.QueryToolChest;
|
||||
import io.druid.query.QueryWatcher;
|
||||
import io.druid.query.groupby.GroupByQuery;
|
||||
import io.druid.query.groupby.GroupByQueryConfig;
|
||||
import io.druid.query.groupby.GroupByQueryEngine;
|
||||
import io.druid.query.groupby.GroupByQueryQueryToolChest;
|
||||
import io.druid.query.groupby.GroupByQueryRunnerFactory;
|
||||
import io.druid.query.select.SelectQueryEngine;
|
||||
import io.druid.query.select.SelectQueryQueryToolChest;
|
||||
import io.druid.query.select.SelectQueryRunnerFactory;
|
||||
import io.druid.segment.IndexIO;
|
||||
import io.druid.segment.IndexMerger;
|
||||
import io.druid.segment.IndexSpec;
|
||||
|
@ -88,17 +98,39 @@ public class AggregationTestHelper
|
|||
private final ObjectMapper mapper;
|
||||
private final IndexMerger indexMerger;
|
||||
private final IndexIO indexIO;
|
||||
private final GroupByQueryQueryToolChest toolChest;
|
||||
private final GroupByQueryRunnerFactory factory;
|
||||
private final QueryToolChest toolChest;
|
||||
private final QueryRunnerFactory factory;
|
||||
|
||||
private final TemporaryFolder tempFolder;
|
||||
|
||||
public AggregationTestHelper(List<? extends Module> jsonModulesToRegister, TemporaryFolder tempFoler)
|
||||
private AggregationTestHelper(
|
||||
ObjectMapper mapper,
|
||||
IndexMerger indexMerger,
|
||||
IndexIO indexIO,
|
||||
QueryToolChest toolchest,
|
||||
QueryRunnerFactory factory,
|
||||
TemporaryFolder tempFolder,
|
||||
List<? extends Module> jsonModulesToRegister
|
||||
)
|
||||
{
|
||||
this.tempFolder = tempFoler;
|
||||
mapper = new DefaultObjectMapper();
|
||||
indexIO = TestHelper.getTestIndexIO();
|
||||
indexMerger = TestHelper.getTestIndexMerger();
|
||||
this.mapper = mapper;
|
||||
this.indexMerger = indexMerger;
|
||||
this.indexIO = indexIO;
|
||||
this.toolChest = toolchest;
|
||||
this.factory = factory;
|
||||
this.tempFolder = tempFolder;
|
||||
|
||||
for(Module mod : jsonModulesToRegister) {
|
||||
mapper.registerModule(mod);
|
||||
}
|
||||
}
|
||||
|
||||
public static final AggregationTestHelper createGroupByQueryAggregationTestHelper(
|
||||
List<? extends Module> jsonModulesToRegister,
|
||||
TemporaryFolder tempFolder
|
||||
)
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
for(Module mod : jsonModulesToRegister) {
|
||||
mapper.registerModule(mod);
|
||||
|
@ -125,17 +157,59 @@ public class AggregationTestHelper
|
|||
};
|
||||
|
||||
GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool);
|
||||
toolChest = new GroupByQueryQueryToolChest(
|
||||
GroupByQueryQueryToolChest toolchest = new GroupByQueryQueryToolChest(
|
||||
configSupplier, mapper, engine, pool,
|
||||
NoopIntervalChunkingQueryRunnerDecorator()
|
||||
);
|
||||
factory = new GroupByQueryRunnerFactory(
|
||||
GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
|
||||
engine,
|
||||
noopQueryWatcher,
|
||||
configSupplier,
|
||||
toolChest,
|
||||
toolchest,
|
||||
pool
|
||||
);
|
||||
|
||||
return new AggregationTestHelper(
|
||||
mapper,
|
||||
TestHelper.getTestIndexMerger(),
|
||||
TestHelper.getTestIndexIO(),
|
||||
toolchest,
|
||||
factory,
|
||||
tempFolder,
|
||||
jsonModulesToRegister
|
||||
);
|
||||
}
|
||||
|
||||
public static final AggregationTestHelper createSelectQueryAggregationTestHelper(
|
||||
List<? extends Module> jsonModulesToRegister,
|
||||
TemporaryFolder tempFolder
|
||||
)
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
||||
SelectQueryQueryToolChest toolchest = new SelectQueryQueryToolChest(
|
||||
new DefaultObjectMapper(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
);
|
||||
|
||||
SelectQueryRunnerFactory factory = new SelectQueryRunnerFactory(
|
||||
new SelectQueryQueryToolChest(
|
||||
new DefaultObjectMapper(),
|
||||
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
|
||||
),
|
||||
new SelectQueryEngine(),
|
||||
QueryRunnerTestHelper.NOOP_QUERYWATCHER
|
||||
);
|
||||
|
||||
return new AggregationTestHelper(
|
||||
mapper,
|
||||
TestHelper.getTestIndexMerger(),
|
||||
TestHelper.getTestIndexIO(),
|
||||
toolchest,
|
||||
factory,
|
||||
tempFolder,
|
||||
jsonModulesToRegister
|
||||
);
|
||||
}
|
||||
|
||||
public Sequence<Row> createIndexAndRunQueryOnSegment(
|
||||
|
@ -289,12 +363,12 @@ public class AggregationTestHelper
|
|||
|
||||
//Simulates running group-by query on individual segments as historicals would do, json serialize the results
|
||||
//from each segment, later deserialize and merge and finally return the results
|
||||
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final String groupByQueryJson) throws Exception
|
||||
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final String queryJson) throws Exception
|
||||
{
|
||||
return runQueryOnSegments(segmentDirs, mapper.readValue(groupByQueryJson, GroupByQuery.class));
|
||||
return runQueryOnSegments(segmentDirs, mapper.readValue(queryJson, Query.class));
|
||||
}
|
||||
|
||||
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final GroupByQuery query)
|
||||
public Sequence<Row> runQueryOnSegments(final List<File> segmentDirs, final Query query)
|
||||
{
|
||||
final List<Segment> segments = Lists.transform(
|
||||
segmentDirs,
|
||||
|
@ -322,7 +396,7 @@ public class AggregationTestHelper
|
|||
}
|
||||
}
|
||||
|
||||
public Sequence<Row> runQueryOnSegmentsObjs(final List<Segment> segments, final GroupByQuery query)
|
||||
public Sequence<Row> runQueryOnSegmentsObjs(final List<Segment> segments, final Query query)
|
||||
{
|
||||
final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner(
|
||||
toolChest.postMergeQueryDecoration(
|
||||
|
@ -385,10 +459,11 @@ public class AggregationTestHelper
|
|||
);
|
||||
String resultStr = mapper.writer().writeValueAsString(yielder);
|
||||
|
||||
TypeFactory typeFactory = mapper.getTypeFactory();
|
||||
JavaType baseType = typeFactory.constructType(toolChest.getResultTypeReference());
|
||||
|
||||
List resultRows = Lists.transform(
|
||||
(List<Row>)mapper.readValue(
|
||||
resultStr, new TypeReference<List<Row>>() {}
|
||||
),
|
||||
readQueryResultArrayFromString(resultStr),
|
||||
toolChest.makePreComputeManipulatorFn(
|
||||
query,
|
||||
MetricManipulatorFns.deserializing()
|
||||
|
@ -402,6 +477,24 @@ public class AggregationTestHelper
|
|||
};
|
||||
}
|
||||
|
||||
private List readQueryResultArrayFromString(String str) throws Exception
|
||||
{
|
||||
List result = new ArrayList();
|
||||
|
||||
JsonParser jp = mapper.getFactory().createParser(str);
|
||||
|
||||
if (jp.nextToken() != JsonToken.START_ARRAY) {
|
||||
throw new IAE("not an array [%s]", str);
|
||||
}
|
||||
|
||||
ObjectCodec objectCodec = jp.getCodec();
|
||||
|
||||
while(jp.nextToken() != JsonToken.END_ARRAY) {
|
||||
result.add(objectCodec.readValue(jp, toolChest.getResultTypeReference()));
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
|
||||
{
|
||||
return new IntervalChunkingQueryRunnerDecorator(null, null, null) {
|
||||
|
|
|
@ -41,7 +41,10 @@ public class HyperUniquesAggregationTest
|
|||
@Test
|
||||
public void testIngestAndQuery() throws Exception
|
||||
{
|
||||
AggregationTestHelper helper = new AggregationTestHelper(Lists.newArrayList(new AggregatorsModule()), tempFolder);
|
||||
AggregationTestHelper helper = AggregationTestHelper.createGroupByQueryAggregationTestHelper(
|
||||
Lists.newArrayList(new AggregatorsModule()),
|
||||
tempFolder
|
||||
);
|
||||
|
||||
String metricSpec = "[{"
|
||||
+ "\"type\": \"hyperUnique\","
|
||||
|
|
Loading…
Reference in New Issue