From 160d5fe6b7a927d4b1bd431a52ddb3e2bd69ecac Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 22 May 2015 17:01:49 -0500 Subject: [PATCH] a general class for testing any [complex] aggregation implementation --- .../aggregation/AggregationTestHelper.java | 341 ++++++++++++++++++ 1 file changed, 341 insertions(+) create mode 100644 processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java new file mode 100644 index 00000000000..93164e6d5d1 --- /dev/null +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -0,0 +1,341 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.query.aggregation; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.CharSource; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.guava.CloseQuietly; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import io.druid.collections.StupidPool; +import io.druid.data.input.Row; +import io.druid.data.input.impl.InputRowParser; +import io.druid.data.input.impl.StringInputRowParser; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.ConcatQueryRunner; +import io.druid.query.FinalizeResultsQueryRunner; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.query.groupby.GroupByQuery; +import io.druid.query.groupby.GroupByQueryConfig; +import io.druid.query.groupby.GroupByQueryEngine; +import io.druid.query.groupby.GroupByQueryQueryToolChest; +import io.druid.query.groupby.GroupByQueryRunnerFactory; +import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; +import io.druid.segment.IndexSpec; +import io.druid.segment.QueryableIndexSegment; +import io.druid.segment.Segment; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * This class provides general utility to test any druid aggregation implementation given raw data, + * parser spec, aggregator specs and a group-by query. + * It allows you to create index from raw data, run a group by query on it which simulates query processing inside + * of a druid cluster exercising most of the features from aggregation and returns the results that you could verify. + */ +public class AggregationTestHelper +{ + private final ObjectMapper mapper; + private final GroupByQueryQueryToolChest toolChest; + private final GroupByQueryRunnerFactory factory; + + public AggregationTestHelper(List jsonModulesToRegister) + { + mapper = new DefaultObjectMapper(); + + for(Module mod : jsonModulesToRegister) { + mapper.registerModule(mod); + } + + Supplier configSupplier = Suppliers.ofInstance(new GroupByQueryConfig()); + StupidPool pool = new StupidPool<>( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(1024 * 1024); + } + }); + + QueryWatcher noopQueryWatcher = new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + + } + }; + + GroupByQueryEngine engine = new GroupByQueryEngine(configSupplier, pool); + toolChest = new GroupByQueryQueryToolChest( + configSupplier, mapper, engine, pool, + NoopIntervalChunkingQueryRunnerDecorator() + ); + factory = new GroupByQueryRunnerFactory( + engine, + noopQueryWatcher, + configSupplier, + toolChest, + pool + ); + } + + public Sequence createIndexAndRunQueryOnSegment( + File inputDataFile, + String parserJson, + String aggregators, + long minTimestamp, + QueryGranularity gran, + int maxRowCount, + String groupByQueryJson + ) throws Exception + { + File segmentDir = Files.createTempDir(); + try { + createIndex(inputDataFile, parserJson, aggregators, segmentDir, minTimestamp, gran, maxRowCount); + return runQueryOnSegments(Lists.newArrayList(segmentDir), groupByQueryJson); + } finally { + FileUtils.deleteDirectory(segmentDir); + } + } + + public void createIndex( + File inputDataFile, + String parserJson, + String aggregators, + File outDir, + long minTimestamp, + QueryGranularity gran, + int maxRowCount + ) throws Exception + { + StringInputRowParser parser = mapper.readValue(parserJson, StringInputRowParser.class); + + CharSource cs = Files.asCharSource(inputDataFile, Charset.defaultCharset()); + Iterator iter = cs.readLines().iterator(); + + List aggregatorSpecs = mapper.readValue( + aggregators, + new TypeReference>() + { + } + ); + + createIndex( + iter, + parser, + aggregatorSpecs.toArray(new AggregatorFactory[0]), + outDir, + minTimestamp, + gran, + true, + maxRowCount + ); + } + + public void createIndex( + Iterator rows, + InputRowParser parser, + final AggregatorFactory[] metrics, + File outDir, + long minTimestamp, + QueryGranularity gran, + boolean deserializeComplexMetrics, + int maxRowCount + ) throws Exception + { + try(IncrementalIndex index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, maxRowCount)) { + while (rows.hasNext()) { + + Object row = rows.next(); + if (row instanceof String && parser instanceof StringInputRowParser) { + //Note: this is required because StringInputRowParser is InputRowParser as opposed to + //InputRowsParser + index.add(((StringInputRowParser) parser).parse((String) row)); + } else { + index.add(parser.parse(row)); + } + } + IndexMerger.persist(index, outDir, new IndexSpec()); + } + } + + //Simulates running group-by query on individual segments as historicals would do, json serialize the results + //from each segment, later deserialize and merge and finally return the results + public Sequence runQueryOnSegments(final List segmentDirs, final String groupByQueryJson) throws Exception + { + return runQueryOnSegments(segmentDirs, mapper.readValue(groupByQueryJson, GroupByQuery.class)); + } + + public Sequence runQueryOnSegments(final List segmentDirs, final GroupByQuery query) + { + final List segments = Lists.transform( + segmentDirs, + new Function() + { + @Override + public QueryableIndexSegment apply(File segmentDir) + { + try { + return new QueryableIndexSegment("", IndexIO.loadIndex(segmentDir)); + } + catch (IOException ex) { + throw Throwables.propagate(ex); + } + } + } + ); + + try { + final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( + toolChest.postMergeQueryDecoration( + toolChest.mergeResults( + toolChest.preMergeQueryDecoration( + new ConcatQueryRunner( + Sequences.simple( + Lists.transform( + segments, + new Function() + { + @Override + public QueryRunner apply(final Segment segment) + { + try { + return makeStringSerdeQueryRunner( + mapper, + toolChest, + query, + factory.createRunner(segment) + ); + } + catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + } + ) + ) + ) + ) + ) + ), + toolChest + ); + + return baseRunner.run(query, Maps.newHashMap()); + } finally { + for(Segment segment: segments) { + CloseQuietly.close(segment); + } + } + + } + + public QueryRunner makeStringSerdeQueryRunner(final ObjectMapper mapper, final QueryToolChest toolChest, final Query query, final QueryRunner baseRunner) + { + return new QueryRunner() + { + @Override + public Sequence run(Query query, Map map) + { + try { + Sequence resultSeq = baseRunner.run(query, Maps.newHashMap()); + final Yielder yielder = resultSeq.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + String resultStr = mapper.writer().writeValueAsString(yielder); + + List resultRows = Lists.transform( + (List)mapper.readValue( + resultStr, new TypeReference>() {} + ), + toolChest.makePreComputeManipulatorFn( + query, + MetricManipulatorFns.deserializing() + ) + ); + return Sequences.simple(resultRows); + } catch(Exception ex) { + throw Throwables.propagate(ex); + } + } + }; + } + + public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator() + { + return new IntervalChunkingQueryRunnerDecorator(null, null, null) { + @Override + public QueryRunner decorate(final QueryRunner delegate, + QueryToolChest> toolChest) { + return new QueryRunner() { + @Override + public Sequence run(Query query, Map responseContext) + { + return delegate.run(query, responseContext); + } + }; + } + }; + } + + public ObjectMapper getObjectMapper() + { + return mapper; + } +} +