mirror of https://github.com/apache/druid.git
commit
43853a1d9e
|
@ -23,6 +23,7 @@ import com.google.common.base.Function;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.metamx.common.guava.BaseSequence;
|
||||
import com.metamx.common.guava.MergeSequence;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.SequenceTestHelper;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
@ -230,17 +231,65 @@ public class OrderedMergeSequenceTest
|
|||
ordering,
|
||||
Sequences.simple(
|
||||
Lists.transform( // OMG WTF, the java type system really annoys me at times...
|
||||
seqs,
|
||||
new Function<TestSequence<T>, Sequence<T>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> apply(@Nullable TestSequence<T> input)
|
||||
{
|
||||
return input;
|
||||
}
|
||||
}
|
||||
seqs,
|
||||
new Function<TestSequence<T>, Sequence<T>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> apply(@Nullable TestSequence<T> input)
|
||||
{
|
||||
return input;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private <T> MergeSequence<T> makeUnorderedMergedSequence(
|
||||
Ordering<T> ordering,
|
||||
List<TestSequence<T>> seqs
|
||||
)
|
||||
{
|
||||
return new MergeSequence<T>(
|
||||
ordering,
|
||||
Sequences.simple(
|
||||
Lists.transform( // OMG WTF, the java type system really annoys me at times...
|
||||
seqs,
|
||||
new Function<TestSequence<T>, Sequence<T>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> apply(@Nullable TestSequence<T> input)
|
||||
{
|
||||
return input;
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHierarchicalMerge() throws Exception
|
||||
{
|
||||
final Sequence<Integer> seq1 = makeUnorderedMergedSequence(
|
||||
Ordering.<Integer>natural(), Lists.newArrayList(
|
||||
TestSequence.create(1)
|
||||
)
|
||||
);
|
||||
|
||||
|
||||
final Sequence<Integer> seq2 = makeUnorderedMergedSequence(
|
||||
Ordering.<Integer>natural(), Lists.newArrayList(
|
||||
TestSequence.create(1)
|
||||
)
|
||||
);
|
||||
final OrderedMergeSequence<Integer> finalMerged = new OrderedMergeSequence<Integer>(
|
||||
Ordering.<Integer>natural(),
|
||||
Sequences.simple(
|
||||
Lists.<Sequence<Integer>>newArrayList(seq1, seq2)
|
||||
)
|
||||
);
|
||||
|
||||
SequenceTestHelper.testAll(finalMerged, Arrays.asList(1, 1));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,17 +22,24 @@
|
|||
package io.druid.query;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequence;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class UnionQueryRunner<T> implements QueryRunner<T>
|
||||
{
|
||||
private final QueryRunner<T> baseRunner;
|
||||
private final QueryToolChest<T, Query<T>> toolChest;
|
||||
|
||||
public UnionQueryRunner(QueryRunner<T> baseRunner)
|
||||
public UnionQueryRunner(
|
||||
QueryRunner<T> baseRunner,
|
||||
QueryToolChest<T, Query<T>> toolChest
|
||||
)
|
||||
{
|
||||
this.baseRunner = baseRunner;
|
||||
this.toolChest = toolChest;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -40,19 +47,21 @@ public class UnionQueryRunner<T> implements QueryRunner<T>
|
|||
{
|
||||
DataSource dataSource = query.getDataSource();
|
||||
if (dataSource instanceof UnionDataSource) {
|
||||
return Sequences.concat(
|
||||
Iterables.transform(
|
||||
((UnionDataSource) dataSource).getDataSources(),
|
||||
new Function<DataSource, Sequence<T>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> apply(DataSource singleSource)
|
||||
{
|
||||
return baseRunner.run(
|
||||
query.withDataSource(singleSource)
|
||||
);
|
||||
}
|
||||
}
|
||||
return toolChest.mergeSequences(
|
||||
Sequences.simple(
|
||||
Lists.transform(
|
||||
((UnionDataSource) dataSource).getDataSources(),
|
||||
new Function<DataSource, Sequence<T>>()
|
||||
{
|
||||
@Override
|
||||
public Sequence<T> apply(DataSource singleSource)
|
||||
{
|
||||
return baseRunner.run(
|
||||
query.withDataSource(singleSource)
|
||||
);
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
} else {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.query;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.granularity.QueryGranularity;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
|
@ -41,6 +42,7 @@ import io.druid.segment.incremental.IncrementalIndex;
|
|||
import org.joda.time.DateTime;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
@ -52,6 +54,19 @@ public class QueryRunnerTestHelper
|
|||
{
|
||||
public static final String segmentId = "testSegment";
|
||||
public static final String dataSource = "testing";
|
||||
public static final UnionDataSource unionDataSource = new UnionDataSource(
|
||||
Lists.transform(
|
||||
Lists.newArrayList(dataSource, dataSource, dataSource, dataSource), new Function<String, TableDataSource>()
|
||||
{
|
||||
@Nullable
|
||||
@Override
|
||||
public TableDataSource apply(@Nullable String input)
|
||||
{
|
||||
return new TableDataSource(input);
|
||||
}
|
||||
}
|
||||
)
|
||||
);
|
||||
public static final QueryGranularity dayGran = QueryGranularity.DAY;
|
||||
public static final QueryGranularity allGran = QueryGranularity.ALL;
|
||||
public static final String providerDimension = "proVider";
|
||||
|
@ -165,6 +180,30 @@ public class QueryRunnerTestHelper
|
|||
);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Collection<?> makeUnionQueryRunners(
|
||||
QueryRunnerFactory factory
|
||||
)
|
||||
throws IOException
|
||||
{
|
||||
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
|
||||
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
|
||||
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
|
||||
return Arrays.asList(
|
||||
new Object[][]{
|
||||
{
|
||||
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId))
|
||||
},
|
||||
{
|
||||
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex))
|
||||
},
|
||||
{
|
||||
makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mergedRealtimeIndex))
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public static <T> QueryRunner<T> makeQueryRunner(
|
||||
QueryRunnerFactory<T, Query<T>> factory,
|
||||
Segment adapter
|
||||
|
@ -178,4 +217,25 @@ public class QueryRunnerTestHelper
|
|||
factory.getToolchest()
|
||||
);
|
||||
}
|
||||
|
||||
public static <T> QueryRunner<T> makeUnionQueryRunner(
|
||||
QueryRunnerFactory<T, Query<T>> factory,
|
||||
Segment adapter
|
||||
)
|
||||
{
|
||||
return new FinalizeResultsQueryRunner<T>(
|
||||
factory.getToolchest().postMergeQueryDecoration(
|
||||
factory.getToolchest().mergeResults(
|
||||
new UnionQueryRunner<T>(
|
||||
new BySegmentQueryRunner<T>(
|
||||
segmentId, adapter.getDataInterval().getStart(),
|
||||
factory.createRunner(adapter)
|
||||
),
|
||||
factory.getToolchest()
|
||||
)
|
||||
)
|
||||
),
|
||||
factory.getToolchest()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.timeseries;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.metamx.common.guava.Sequences;
|
||||
import io.druid.query.Druids;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.LongSumAggregatorFactory;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TimeSeriesUnionQueryRunnerTest
|
||||
{
|
||||
@Parameterized.Parameters
|
||||
public static Collection<?> constructorFeeder() throws IOException
|
||||
{
|
||||
return QueryRunnerTestHelper.makeUnionQueryRunners(
|
||||
TimeseriesQueryRunnerFactory.create()
|
||||
);
|
||||
}
|
||||
|
||||
private final QueryRunner runner;
|
||||
|
||||
public TimeSeriesUnionQueryRunnerTest(
|
||||
QueryRunner runner
|
||||
)
|
||||
{
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnionTimeseries()
|
||||
{
|
||||
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.unionDataSource)
|
||||
.granularity(QueryRunnerTestHelper.dayGran)
|
||||
.intervals(QueryRunnerTestHelper.firstToThird)
|
||||
.aggregators(
|
||||
Arrays.<AggregatorFactory>asList(
|
||||
QueryRunnerTestHelper.rowsCount,
|
||||
new LongSumAggregatorFactory(
|
||||
"idx",
|
||||
"index"
|
||||
),
|
||||
QueryRunnerTestHelper.qualityUniques
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-01"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 52L, "idx", 26476L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
),
|
||||
new Result<TimeseriesResultValue>(
|
||||
new DateTime("2011-04-02"),
|
||||
new TimeseriesResultValue(
|
||||
ImmutableMap.<String, Object>of("rows", 52L, "idx", 23308L, "uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
Iterable<Result<TimeseriesResultValue>> results = Sequences.toList(
|
||||
runner.run(query),
|
||||
Lists.<Result<TimeseriesResultValue>>newArrayList()
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, results);
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
/*
|
||||
* Druid - a distributed column store.
|
||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or
|
||||
* modify it under the terms of the GNU General Public License
|
||||
* as published by the Free Software Foundation; either version 2
|
||||
* of the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program; if not, write to the Free Software
|
||||
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
*/
|
||||
|
||||
package io.druid.query.topn;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.druid.collections.StupidPool;
|
||||
import io.druid.query.QueryRunner;
|
||||
import io.druid.query.QueryRunnerTestHelper;
|
||||
import io.druid.query.Result;
|
||||
import io.druid.query.TestQueryRunners;
|
||||
import io.druid.query.aggregation.AggregatorFactory;
|
||||
import io.druid.query.aggregation.MaxAggregatorFactory;
|
||||
import io.druid.query.aggregation.MinAggregatorFactory;
|
||||
import io.druid.query.aggregation.PostAggregator;
|
||||
import io.druid.segment.TestHelper;
|
||||
import org.joda.time.DateTime;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class TopNUnionQueryTest
|
||||
{
|
||||
private final QueryRunner runner;
|
||||
|
||||
public TopNUnionQueryTest(
|
||||
QueryRunner runner
|
||||
)
|
||||
{
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Parameterized.Parameters
|
||||
public static Collection<?> constructorFeeder() throws IOException
|
||||
{
|
||||
List<Object> retVal = Lists.newArrayList();
|
||||
retVal.addAll(
|
||||
QueryRunnerTestHelper.makeUnionQueryRunners(
|
||||
new TopNQueryRunnerFactory(
|
||||
TestQueryRunners.getPool(),
|
||||
new TopNQueryQueryToolChest(new TopNQueryConfig())
|
||||
)
|
||||
)
|
||||
);
|
||||
retVal.addAll(
|
||||
QueryRunnerTestHelper.makeUnionQueryRunners(
|
||||
new TopNQueryRunnerFactory(
|
||||
new StupidPool<ByteBuffer>(
|
||||
new Supplier<ByteBuffer>()
|
||||
{
|
||||
@Override
|
||||
public ByteBuffer get()
|
||||
{
|
||||
return ByteBuffer.allocate(2000);
|
||||
}
|
||||
}
|
||||
),
|
||||
new TopNQueryQueryToolChest(new TopNQueryConfig())
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
return retVal;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTopNUnionQuery()
|
||||
{
|
||||
TopNQuery query = new TopNQueryBuilder()
|
||||
.dataSource(QueryRunnerTestHelper.unionDataSource)
|
||||
.granularity(QueryRunnerTestHelper.allGran)
|
||||
.dimension(QueryRunnerTestHelper.providerDimension)
|
||||
.metric(QueryRunnerTestHelper.dependentPostAggMetric)
|
||||
.threshold(4)
|
||||
.intervals(QueryRunnerTestHelper.fullOnInterval)
|
||||
.aggregators(
|
||||
Lists.<AggregatorFactory>newArrayList(
|
||||
Iterables.concat(
|
||||
QueryRunnerTestHelper.commonAggregators,
|
||||
Lists.newArrayList(
|
||||
new MaxAggregatorFactory("maxIndex", "index"),
|
||||
new MinAggregatorFactory("minIndex", "index")
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
.postAggregators(
|
||||
Arrays.<PostAggregator>asList(
|
||||
QueryRunnerTestHelper.addRowsIndexConstant,
|
||||
QueryRunnerTestHelper.dependentPostAgg,
|
||||
QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg
|
||||
)
|
||||
)
|
||||
.build();
|
||||
|
||||
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
|
||||
new Result<TopNResultValue>(
|
||||
new DateTime("2011-01-12T00:00:00.000Z"),
|
||||
new TopNResultValue(
|
||||
Arrays.<Map<String, Object>>asList(
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put(QueryRunnerTestHelper.providerDimension, "total_market")
|
||||
.put("rows", 744L)
|
||||
.put("index", 862719.3151855469D)
|
||||
.put("addRowsIndexConstant", 863464.3151855469D)
|
||||
.put(QueryRunnerTestHelper.dependentPostAggMetric, 864209.3151855469D)
|
||||
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
|
||||
.put("maxIndex", 1743.9217529296875D)
|
||||
.put("minIndex", 792.3260498046875D)
|
||||
.put(
|
||||
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
|
||||
QueryRunnerTestHelper.UNIQUES_2 + 1.0
|
||||
)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put(QueryRunnerTestHelper.providerDimension, "upfront")
|
||||
.put("rows", 744L)
|
||||
.put("index", 768184.4240722656D)
|
||||
.put("addRowsIndexConstant", 768929.4240722656D)
|
||||
.put(QueryRunnerTestHelper.dependentPostAggMetric, 769674.4240722656D)
|
||||
.put("uniques", QueryRunnerTestHelper.UNIQUES_2)
|
||||
.put("maxIndex", 1870.06103515625D)
|
||||
.put("minIndex", 545.9906005859375D)
|
||||
.put(
|
||||
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
|
||||
QueryRunnerTestHelper.UNIQUES_2 + 1.0
|
||||
)
|
||||
.build(),
|
||||
ImmutableMap.<String, Object>builder()
|
||||
.put(QueryRunnerTestHelper.providerDimension, "spot")
|
||||
.put("rows", 3348L)
|
||||
.put("index", 382426.28929138184D)
|
||||
.put("addRowsIndexConstant", 385775.28929138184D)
|
||||
.put(QueryRunnerTestHelper.dependentPostAggMetric, 389124.28929138184D)
|
||||
.put("uniques", QueryRunnerTestHelper.UNIQUES_9)
|
||||
.put(
|
||||
QueryRunnerTestHelper.hyperUniqueFinalizingPostAggMetric,
|
||||
QueryRunnerTestHelper.UNIQUES_9 + 1.0
|
||||
)
|
||||
.put("maxIndex", 277.2735290527344D)
|
||||
.put("minIndex", 59.02102279663086D)
|
||||
.build()
|
||||
)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -87,7 +87,8 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
|
|||
}
|
||||
},
|
||||
toolChest.preMergeQueryDecoration(baseClient)
|
||||
).withWaitMeasuredFromNow()
|
||||
).withWaitMeasuredFromNow(),
|
||||
toolChest
|
||||
)
|
||||
)
|
||||
),
|
||||
|
|
Loading…
Reference in New Issue