Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
Eric Tschetter 2013-01-29 11:29:42 -06:00
commit 08d3253f3d
4 changed files with 274 additions and 2 deletions

View File

@ -101,10 +101,11 @@ public class GroupByQueryQueryToolChest implements QueryToolChest<Row, GroupByQu
}
);
final QueryGranularity gran = query.getGranularity();
final IncrementalIndex index = runner.run(query).accumulate(
new IncrementalIndex(
condensed.get(0).getStartMillis(),
query.getGranularity(),
gran.truncate(condensed.get(0).getStartMillis()),
gran,
aggs.toArray(new AggregatorFactory[aggs.size()])
),
new Accumulator<IncrementalIndex, Row>()

View File

@ -52,6 +52,12 @@ public class Rows
{
return row.getFloatMetric(metric);
}
@Override
public String toString()
{
return row.toString();
}
};
}
}

View File

@ -46,6 +46,16 @@ public class TestHelper
assertResults(expectedResults, results, failMsg);
}
public static <T> void assertExpectedObjects(Iterable<T> expectedResults, Iterable<T> results, String failMsg)
{
assertObjects(expectedResults, results, failMsg);
}
public static <T> void assertExpectedObjects(Iterable<T> expectedResults, Sequence<T> results, String failMsg)
{
assertObjects(expectedResults, Sequences.toList(results, Lists.<T>newArrayList()), failMsg);
}
private static <T> void assertResults(Iterable<Result<T>> expectedResults, Iterable<Result<T>> actualResults, String failMsg)
{
Iterator<? extends Result> resultsIter = actualResults.iterator();
@ -86,6 +96,46 @@ public class TestHelper
}
}
private static <T> void assertObjects(Iterable<T> expectedResults, Iterable<T> actualResults, String failMsg)
{
Iterator resultsIter = actualResults.iterator();
Iterator resultsIter2 = actualResults.iterator();
Iterator expectedResultsIter = expectedResults.iterator();
while (resultsIter.hasNext() && resultsIter2.hasNext() && expectedResultsIter.hasNext()) {
Object expectedNext = expectedResultsIter.next();
final Object next = resultsIter.next();
final Object next2 = resultsIter2.next();
Assert.assertEquals(failMsg, expectedNext, next);
Assert.assertEquals(
String.format("%sSecond iterator bad, multiple calls to iterator() should be safe", failMsg),
expectedNext,
next2
);
}
if (resultsIter.hasNext()) {
Assert.fail(
String.format("%sExpected resultsIter to be exhausted, next element was %s", failMsg, resultsIter.next())
);
}
if (resultsIter2.hasNext()) {
Assert.fail(
String.format("%sExpected resultsIter2 to be exhausted, next element was %s", failMsg, resultsIter.next())
);
}
if (expectedResultsIter.hasNext()) {
Assert.fail(
String.format(
"%sExpected expectedResultsIter to be exhausted, next element was %s", failMsg, expectedResultsIter.next()
)
);
}
}
private static void assertResult(String msg, Result<?> expected, Result actual)
{
Assert.assertEquals(msg, expected, actual);

View File

@ -0,0 +1,215 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.group;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.Query;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryRunnerTestHelper;
import com.metamx.druid.query.dimension.DefaultDimensionSpec;
import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.query.segment.MultipleIntervalSegmentSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerTest
{
private final QueryRunner<Row> runner;
private GroupByQueryRunnerFactory factory;
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
new GroupByQueryEngine(
new GroupByQueryEngineConfig()
{
@Override
public int getMaxIntermediateRows()
{
return 10000;
}
},
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(1024 * 1024);
}
}
)
)
);
return Lists.newArrayList(
Iterables.transform(
QueryRunnerTestHelper.makeQueryRunners(factory), new Function<Object, Object>()
{
@Override
public Object apply(@Nullable Object input)
{
return new Object[]{factory, ((Object[]) input)[0]};
}
}
)
);
}
public GroupByQueryRunnerTest(GroupByQueryRunnerFactory factory, QueryRunner runner) {
this.factory = factory;
this.runner = runner;
}
@Test
public void testGroupBy() {
GroupByQuery query = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird)
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(QueryRunnerTestHelper.dayGran)
.build();
List<Row> expectedResults = Arrays.<Row>asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L),
createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
);
Iterable<Row> results = Sequences.toList(runner.run(query), Lists.<Row>newArrayList());
TestHelper.assertExpectedObjects(expectedResults, results, "");
}
@Test
public void testMergeResults() {
GroupByQuery.Builder builder = GroupByQuery
.builder()
.setDataSource(QueryRunnerTestHelper.dataSource)
.setInterval("2011-04-02/2011-04-04")
.setDimensions(Lists.<DimensionSpec>newArrayList(new DefaultDimensionSpec("quality", "alias")))
.setAggregatorSpecs(
Arrays.<AggregatorFactory>asList(
QueryRunnerTestHelper.rowsCount,
new LongSumAggregatorFactory("idx", "index")
)
)
.setGranularity(new PeriodGranularity(new Period("P1M"), null, null));
final GroupByQuery fullQuery = builder.build();
QueryRunner mergedRunner = new GroupByQueryQueryToolChest().mergeResults(
new QueryRunner<Row>()
{
@Override
public Sequence run(Query<Row> query)
{
// simulate two daily segments
final Query query1 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-02/2011-04-03")))
);
final Query query2 = query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(Lists.newArrayList(new Interval("2011-04-03/2011-04-04")))
);
return Sequences.concat(runner.run(query1), runner.run(query2));
}
}
);
List<Row> expectedResults = Arrays.<Row>asList(
createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 269L),
createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 217L),
createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 319L),
createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 216L),
createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 4420L),
createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 221L),
createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 4416L),
createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 177L),
createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 243L)
);
TestHelper.assertExpectedObjects(expectedResults, runner.run(fullQuery), "direct");
TestHelper.assertExpectedObjects(expectedResults, mergedRunner.run(fullQuery), "merged");
}
private MapBasedRow createExpectedRow(final String timestamp, Object... vals)
{
Preconditions.checkArgument(vals.length % 2 == 0);
Map<String, Object> theVals = Maps.newHashMap();
for (int i = 0; i < vals.length; i+=2) {
theVals.put(vals[i].toString(), vals[i+1]);
}
return new MapBasedRow(new DateTime(timestamp), theVals);
}
}