From 160d5fe6b7a927d4b1bd431a52ddb3e2bd69ecac Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 22 May 2015 17:01:49 -0500 Subject: [PATCH 1/2] a general class for testing any [complex] aggregation implementation --- .../aggregation/AggregationTestHelper.java | 341 ++++++++++++++++++ 1 file changed, 341 insertions(+) create mode 100644 processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java new file mode 100644 index 00000000000..93164e6d5d1 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -0,0 +1,341 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.CharSource; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import io.druid.collections.StupidPool; +import io.druid.data.input.Row; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.ConcatQueryRunner; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryEngine; +import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * This class provides general utility to test any druid aggregation implementation given raw data, + * parser spec, aggregator specs and a group-by query. + * It allows you to create index from raw data, run a group by query on it which simulates query processing inside + * of a druid cluster exercising most of the features from aggregation and returns the results that you could verify. + */ +public class AggregationTestHelper +{ + private final ObjectMapper mapper; + private final GroupByQueryQueryToolChest toolChest; + private final GroupByQueryRunnerFactory factory; + + public AggregationTestHelper(List jsonModulesToRegister) + { + mapper = new DefaultObjectMapper(); + + for(Module mod : jsonModulesToRegister) { + mapper.registerModule(mod); + } + + Supplier configSupplier = Suppliers.ofInstance(new GroupByQueryConfig()); + StupidPool pool = new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + }); + + QueryWatcher noopQueryWatcher = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); + toolChest = new GroupByQueryQueryToolChest( + configSupplier, mapper, engine, pool, + NoopIntervalChunkingQueryRunnerDecorator() + ); + factory = new GroupByQueryRunnerFactory( + engine, + noopQueryWatcher, + configSupplier, + toolChest, + pool + ); + } + + public Sequence createIndexAndRunQueryOnSegment( + File inputDataFile, + String parserJson, + String aggregators, + long minTimestamp, + QueryGranularity gran, + int maxRowCount, + String groupByQueryJson + ) throws Exception + { + File segmentDir = Files.createTempDir(); + try { + createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount); + return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson); + } finally { + FileUtils.deleteDirectory(segmentDir); + } + } + + public void createIndex( + File inputDataFile, + String parserJson, + String aggregators, + File outDir, + long minTimestamp, + QueryGranularity gran, + int maxRowCount + ) throws Exception + { + StringInputRowParser parser = mapper.readValue(parserJson, StringInputRowParser.class); + + CharSource cs = Files.asCharSource(inputDataFile, Charset.defaultCharset()); + Iterator iter = cs.readLines().iterator(); + + List aggregatorSpecs = mapper.readValue( + aggregators, + new TypeReference>() + { + } + ); + + createIndex( + iter, + parser, + aggregatorSpecs.toArray(new AggregatorFactory[0]), + outDir, + minTimestamp, + gran, + true, + maxRowCount + ); + } + + public void createIndex( + Iterator rows, + InputRowParser parser, + final AggregatorFactory[] metrics, + File outDir, + long minTimestamp, + QueryGranularity gran, + boolean deserializeComplexMetrics, + int maxRowCount + ) throws Exception + { + try(IncrementalIndex index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount)) { + while (rows.hasNext()) { + + Object row = rows.next(); + if (row instanceof String && parser instanceof StringInputRowParser) { + //Note: this is required because StringInputRowParser is InputRowParser as opposed to + //InputRowsParser + index.add(((StringInputRowParser) parser).parse((String) row)); + } else { + index.add(parser.parse(row)); + } + } + IndexMerger.persist(index, outDir, new IndexSpec()); + } + } + + //Simulates running group-by query on individual segments as historicals would do, json serialize the results + //from each segment, later deserialize and merge and finally return the results + public Sequence runQueryOnSegments(final List segmentDirs, final String groupByQueryJson) throws Exception + { + return runQueryOnSegments(segmentDirs, mapper.readValue(groupByQueryJson, GroupByQuery.class)); + } + + public Sequence runQueryOnSegments(final List segmentDirs, final GroupByQuery query) + { + final List segments = Lists.transform( + segmentDirs, + new Function() + { + @Override + public QueryableIndexSegment apply(File segmentDir) + { + try { + return new QueryableIndexSegment("", IndexIO.loadIndex(segmentDir)); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + } + ); + + try { + final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( + toolChest.postMergeQueryDecoration( + toolChest.mergeResults( + toolChest.preMergeQueryDecoration( + new ConcatQueryRunner( + Sequences.simple( + Lists.transform( + segments, + new Function() + { + @Override + public QueryRunner apply(final Segment segment) + { + try { + return makeStringSerdeQueryRunner( + mapper, + toolChest, + query, + factory.createRunner(segment) + ); + } + catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + } + ) + ) + ) + ) + ) + ), + toolChest + ); + + return baseRunner.run(query, Maps.newHashMap()); + } finally { + for(Segment segment: segments) { + CloseQuietly.close(segment); + } + } + + } + + public QueryRunner makeStringSerdeQueryRunner(final ObjectMapper mapper, final QueryToolChest toolChest, final Query query, final QueryRunner baseRunner) + { + return new QueryRunner() + { + @Override + public Sequence run(Query query, Map map) + { + try { + Sequence resultSeq = baseRunner.run(query, Maps.newHashMap()); + final Yielder yielder = resultSeq.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + String resultStr = mapper.writer().writeValueAsString(yielder); + + List resultRows = Lists.transform( + (List)mapper.readValue( + resultStr, new TypeReference>() {} + ), + toolChest.makePreComputeManipulatorFn( + query, + MetricManipulatorFns.deserializing() + ) + ); + return Sequences.simple(resultRows); + } catch(Exception ex) { + throw Throwables.propagate(ex); + } + } + }; + } + + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() + { + return new IntervalChunkingQueryRunnerDecorator(null, null, null) { + @Override + public QueryRunner decorate(final QueryRunner delegate, + QueryToolChest> toolChest) { + return new QueryRunner() { + @Override + public Sequence run(Query query, Map responseContext) + { + return delegate.run(query, responseContext); + } + }; + } + }; + } + + public ObjectMapper getObjectMapper() + { + return mapper; + } +} + From 215c1ab01e139f214ba5ff3b30195c840f984b12 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 22 May 2015 17:02:39 -0500 Subject: [PATCH 2/2] UTs for hyperUnique aggregation --- docs/content/querying/post-aggregations.md | 2 +- .../HyperUniqueFinalizingPostAggregator.java | 17 +++- .../io/druid/query/QueryRunnerTestHelper.java | 2 +- ...perUniqueFinalizingPostAggregatorTest.java | 2 +- .../HyperUniquesAggregationTest.java | 92 +++++++++++++++++++ .../query/groupby/GroupByQueryRunnerTest.java | 2 +- 6 files changed, 111 insertions(+), 6 deletions(-) create mode 100644 processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java diff --git a/docs/content/querying/post-aggregations.md b/docs/content/querying/post-aggregations.md index 87bafa5b774..77c0559a829 100644 --- a/docs/content/querying/post-aggregations.md +++ b/docs/content/querying/post-aggregations.md @@ -82,7 +82,7 @@ Example JavaScript aggregator: The hyperUniqueCardinality post aggregator is used to wrap a hyperUnique object such that it can be used in post aggregations. ```json -{ "type" : "hyperUniqueCardinality", "fieldName" : } +{ "type" : "hyperUniqueCardinality", "name": , "fieldName" : } ``` It can be used in a sample calculation as so: diff --git a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java index 63e37f4c3cf..c4d2036d5ca 100644 --- a/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java +++ b/processing/src/main/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregator.java @@ -19,6 +19,7 @@ package io.druid.query.aggregation.hyperloglog; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import io.druid.query.aggregation.PostAggregator; @@ -30,14 +31,20 @@ import java.util.Set; */ public class HyperUniqueFinalizingPostAggregator implements PostAggregator { + private final String name; private final String fieldName; @JsonCreator public HyperUniqueFinalizingPostAggregator( + @JsonProperty("name") String name, @JsonProperty("fieldName") String fieldName ) { - this.fieldName = fieldName; + this.fieldName = Preconditions.checkNotNull(fieldName, "fieldName is null"); + //Note that, in general, name shouldn't be null, we are defaulting + //to fieldName here just to be backward compatible with 0.7.x + this.name = name == null ? fieldName : name; + } @Override @@ -59,8 +66,14 @@ public class HyperUniqueFinalizingPostAggregator implements PostAggregator } @Override - @JsonProperty("fieldName") + @JsonProperty("name") public String getName() + { + return name; + } + + @JsonProperty("fieldName") + public String getFieldName() { return fieldName; } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index 6764130ad50..5fc340a87a7 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -152,7 +152,7 @@ public class QueryRunnerTestHelper public static ArithmeticPostAggregator hyperUniqueFinalizingPostAgg = new ArithmeticPostAggregator( hyperUniqueFinalizingPostAggMetric, "+", - Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric), new ConstantPostAggregator(null, 1)) + Lists.newArrayList(new HyperUniqueFinalizingPostAggregator(uniqueMetric, uniqueMetric), new ConstantPostAggregator(null, 1)) ); public static final List commonAggregators = Arrays.asList( diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java index e711b1ab156..f2098e17568 100644 --- a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniqueFinalizingPostAggregatorTest.java @@ -36,7 +36,7 @@ public class HyperUniqueFinalizingPostAggregatorTest { Random random = new Random(0L); HyperUniqueFinalizingPostAggregator postAggregator = new HyperUniqueFinalizingPostAggregator( - "uniques" + "uniques", "uniques" ); HyperLogLogCollector collector = HyperLogLogCollector.makeLatestCollector(); diff --git a/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java new file mode 100644 index 00000000000..e3de00c44e6 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/hyperloglog/HyperUniquesAggregationTest.java @@ -0,0 +1,92 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation.hyperloglog; + +import com.google.common.collect.Lists; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.data.input.MapBasedRow; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.AggregatorsModule; +import io.druid.query.aggregation.AggregationTestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; + +public class HyperUniquesAggregationTest +{ + @Test + public void testIngestAndQuery() throws Exception + { + AggregationTestHelper helper = new AggregationTestHelper(Lists.newArrayList(new AggregatorsModule())); + + String metricSpec = "[{" + + "\"type\": \"hyperUnique\"," + + "\"name\": \"index_hll\"," + + "\"fieldName\": \"market\"" + + "}]"; + + String parseSpec = "{" + + "\"type\" : \"string\"," + + "\"parseSpec\" : {" + + " \"format\" : \"tsv\"," + + " \"timestampSpec\" : {" + + " \"column\" : \"timestamp\"," + + " \"format\" : \"auto\"" + + "}," + + " \"dimensionsSpec\" : {" + + " \"dimensions\": []," + + " \"dimensionExclusions\" : []," + + " \"spatialDimensions\" : []" + + " }," + + " \"columns\": [\"timestamp\", \"market\", \"quality\", \"placement\", \"placementish\", \"index\"]" + + " }" + + "}"; + + String query = "{" + + "\"queryType\": \"groupBy\"," + + "\"dataSource\": \"test_datasource\"," + + "\"granularity\": \"ALL\"," + + "\"dimensions\": []," + + "\"aggregations\": [" + + " { \"type\": \"hyperUnique\", \"name\": \"index_hll\", \"fieldName\": \"index_hll\" }" + + "]," + + "\"postAggregations\": [" + + " { \"type\": \"hyperUniqueCardinality\", \"name\": \"index_unique_count\", \"fieldName\": \"index_hll\" }" + + "]," + + "\"intervals\": [ \"1970/2050\" ]" + + "}"; + + Sequence seq = helper.createIndexAndRunQueryOnSegment( + new File(this.getClass().getClassLoader().getResource("druid.sample.tsv").getFile()), + parseSpec, + metricSpec, + 0, + QueryGranularity.NONE, + 50000, + query + ); + + MapBasedRow row = (MapBasedRow) Sequences.toList(seq, Lists.newArrayList()).get(0); + Assert.assertEquals(3.0, row.getFloatMetric("index_hll"), 0.1); + Assert.assertEquals(3.0, row.getFloatMetric("index_unique_count"), 0.1); + } +} diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 00d6186d237..efd17c81cfb 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -289,7 +289,7 @@ public class GroupByQueryRunnerTest ) .setPostAggregatorSpecs( Arrays.asList( - new HyperUniqueFinalizingPostAggregator("quality_uniques") + new HyperUniqueFinalizingPostAggregator("quality_uniques", "quality_uniques") ) ) .setGranularity(QueryRunnerTestHelper.allGran)