Minimize PostAggregator computations (#14708)

* Minimize PostAggregator computations

Since a change back in 2014, the topN query has been computing
all PostAggregators on all intermediate responses from leaf nodes
to brokers.  This generates significant slow downs for queries
with relatively expensive PostAggregators.  This change rewrites
the query that is pushed down to only have the minimal set of
PostAggregators such that it is impossible for downstream
processing to do too much work.  The final PostAggregators are
applied at the very end.
This commit is contained in:
imply-cheddar 2023-08-03 13:34:31 -05:00 committed by GitHub
parent 20c48b6a3d
commit 748874405c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 932 additions and 919 deletions

View File

@ -59,7 +59,7 @@ import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
@ -480,8 +480,11 @@ public class CachingClusteredClientBenchmark
private <T> List<T> runQuery()
{
//noinspection unchecked
QueryRunner<T> theRunner = new FluentQueryRunnerBuilder<T>(toolChestWarehouse.getToolChest(query))
.create(cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals()))
QueryRunner<T> theRunner = FluentQueryRunner
.create(
cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals()),
toolChestWarehouse.getToolChest(query)
)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration();

View File

@ -25,6 +25,8 @@ import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -134,11 +136,13 @@ public class DistinctCountGroupByQueryTest extends InitializedNullHandlingTest
.build();
final Segment incrementalIndexSegment = new IncrementalIndexSegment(index, null);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(
factory,
factory.createRunner(incrementalIndexSegment),
query
);
Iterable<ResultRow> results = FluentQueryRunner
.create(factory.createRunner(incrementalIndexSegment), factory.getToolchest())
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.run(QueryPlus.wrap(query))
.toList();
List<ResultRow> expectedResults = Arrays.asList(
GroupByQueryRunnerTestHelper.createExpectedRow(

View File

@ -135,7 +135,7 @@ public class DistinctCountTopNQueryTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
time,
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
client_type, "iphone",

View File

@ -130,7 +130,7 @@ public class MapVirtualColumnTopNTest extends InitializedNullHandlingTest
final List<Result<TopNResultValue>> expected = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
ImmutableList.of(
new DimensionAndMetricValueExtractor(MapVirtualColumnTestBase.mapOf("count", 2L, "params.key3", null)),
new DimensionAndMetricValueExtractor(MapVirtualColumnTestBase.mapOf("count", 1L, "params.key3", "value3"))

View File

@ -20,9 +20,9 @@
package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -58,7 +58,7 @@ public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlin
private static final Closer RESOURCE_CLOSER = Closer.create();
private static TestGroupByBuffers BUFFER_POOLS = null;
private final QueryRunner<Row> runner;
private final QueryRunner<ResultRow> runner;
private final GroupByQueryRunnerFactory factory;
@BeforeClass
@ -142,7 +142,7 @@ public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlin
for (GroupByQueryConfig config : configs) {
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS);
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunnersToMerge(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",
config.toString(),
@ -229,7 +229,7 @@ public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlin
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
TestHelper.assertExpectedObjects(expectedResults, results, "approx-histo");
}
@ -276,7 +276,7 @@ public class ApproximateHistogramGroupByQueryTest extends InitializedNullHandlin
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
TestHelper.assertExpectedObjects(expectedResults, results, "approx-histo");
}
}

View File

@ -22,21 +22,17 @@ package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.TestQueryRunners;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNQueryRunnerTest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
@ -46,7 +42,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -66,32 +61,7 @@ public class ApproximateHistogramTopNQueryTest extends InitializedNullHandlingTe
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(2000)
);
RESOURCE_CLOSER.register(defaultPool);
RESOURCE_CLOSER.register(customPool);
return QueryRunnerTestHelper.transformToConstructionFeeder(
Iterables.concat(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
),
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
customPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
)
);
return QueryRunnerTestHelper.transformToConstructionFeeder(TopNQueryRunnerTest.queryRunners());
}
private final QueryRunner runner;
@ -145,7 +115,7 @@ public class ApproximateHistogramTopNQueryTest extends InitializedNullHandlingTe
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")

View File

@ -20,9 +20,9 @@
package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
@ -58,7 +58,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli
private static final Closer RESOURCE_CLOSER = Closer.create();
private static TestGroupByBuffers BUFFER_POOLS = null;
private final QueryRunner<Row> runner;
private final QueryRunner<ResultRow> runner;
private final GroupByQueryRunnerFactory factory;
@BeforeClass
@ -142,7 +142,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli
for (GroupByQueryConfig config : configs) {
final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config, BUFFER_POOLS);
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunnersToMerge(factory)) {
final String testName = StringUtils.format(
"config=%s, runner=%s",
config.toString(),
@ -155,6 +155,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli
return constructors;
}
@SuppressWarnings("unused")
public FixedBucketsHistogramGroupByQueryTest(
String testName,
GroupByQueryRunnerFactory factory,
@ -230,7 +231,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}
@ -277,7 +278,7 @@ public class FixedBucketsHistogramGroupByQueryTest extends InitializedNullHandli
)
);
Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
Iterable<ResultRow> results = runner.run(QueryPlus.wrap(query)).toList();
TestHelper.assertExpectedObjects(expectedResults, results, "fixed-histo");
}
}

View File

@ -22,21 +22,17 @@ package org.apache.druid.query.aggregation.histogram;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.TestQueryRunners;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNQueryRunnerTest;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
@ -46,7 +42,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -66,32 +61,7 @@ public class FixedBucketsHistogramTopNQueryTest extends InitializedNullHandlingT
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(2000)
);
RESOURCE_CLOSER.register(defaultPool);
RESOURCE_CLOSER.register(customPool);
return QueryRunnerTestHelper.transformToConstructionFeeder(
Iterables.concat(
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
),
QueryRunnerTestHelper.makeQueryRunners(
new TopNQueryRunnerFactory(
customPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
)
);
return QueryRunnerTestHelper.transformToConstructionFeeder(TopNQueryRunnerTest.queryRunners());
}
private final QueryRunner runner;
@ -145,7 +115,7 @@ public class FixedBucketsHistogramTopNQueryTest extends InitializedNullHandlingT
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")

View File

@ -56,10 +56,10 @@ public class VarianceTopNQueryTest extends InitializedNullHandlingTest
return QueryRunnerTestHelper.transformToConstructionFeeder(TopNQueryRunnerTest.queryRunners());
}
private final QueryRunner runner;
private final QueryRunner<Result<TopNResultValue>> runner;
public VarianceTopNQueryTest(
QueryRunner runner
QueryRunner<Result<TopNResultValue>> runner
)
{
this.runner = runner;
@ -92,7 +92,7 @@ public class VarianceTopNQueryTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -131,7 +131,7 @@ public class VarianceTopNQueryTest extends InitializedNullHandlingTest
assertExpectedResults(expectedResults, query);
}
private Sequence<Result<TopNResultValue>> assertExpectedResults(
private void assertExpectedResults(
Iterable<Result<TopNResultValue>> expectedResults,
TopNQuery query
)
@ -140,7 +140,6 @@ public class VarianceTopNQueryTest extends InitializedNullHandlingTest
final QueryRunner<Result<TopNResultValue>> mergeRunner = chest.mergeResults(runner);
final Sequence<Result<TopNResultValue>> retval = mergeRunner.run(QueryPlus.wrap(query));
TestHelper.assertExpectedResults(expectedResults, retval);
return retval;
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.query;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@ -29,6 +28,9 @@ import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ResponseContext;
import java.util.ArrayList;
import java.util.List;
/**
* Query runner that applies {@link QueryToolChest#makePostComputeManipulatorFn(Query, MetricManipulationFn)} to the
* result stream. It is expected to be the last runner in the pipeline, after results are fully merged.
@ -92,10 +94,16 @@ public class FinalizeResultsQueryRunner<T> implements QueryRunner<T>
BySegmentResultValue<T> resultsClass = result.getValue();
final List<T> originalResults = resultsClass.getResults();
final ArrayList<T> transformedResults = new ArrayList<>(originalResults.size());
for (T originalResult : originalResults) {
transformedResults.add(baseFinalizer.apply(originalResult));
}
return new Result<>(
result.getTimestamp(),
new BySegmentResultValueClass<>(
Lists.transform(resultsClass.getResults(), baseFinalizer),
transformedResults,
resultsClass.getSegmentId(),
resultsClass.getInterval()
)

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class FluentQueryRunner<T> implements QueryRunner<T>
{
@SuppressWarnings("unchecked")
public static <K, J extends Query<K>> FluentQueryRunner<K> create(
QueryRunner<K> runner,
QueryToolChest<K, J> toolchest
)
{
return new FluentQueryRunner<>(runner, (QueryToolChest<K, Query<K>>) toolchest);
}
private final QueryToolChest<T, Query<T>> toolChest;
private final QueryRunner<T> baseRunner;
public FluentQueryRunner(QueryRunner<T> runner, QueryToolChest<T, Query<T>> toolChest)
{
this.baseRunner = runner;
this.toolChest = toolChest;
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return baseRunner.run(queryPlus, responseContext);
}
public FluentQueryRunner<T> from(QueryRunner<T> runner)
{
return new FluentQueryRunner<T>(runner, toolChest);
}
public FluentQueryRunner<T> applyPostMergeDecoration()
{
return from(new FinalizeResultsQueryRunner<>(toolChest.postMergeQueryDecoration(baseRunner), toolChest));
}
public FluentQueryRunner<T> applyPreMergeDecoration()
{
return from(new UnionQueryRunner<>(toolChest.preMergeQueryDecoration(baseRunner)));
}
public FluentQueryRunner<T> emitCPUTimeMetric(ServiceEmitter emitter)
{
return emitCPUTimeMetric(emitter, new AtomicLong(0L));
}
public FluentQueryRunner<T> emitCPUTimeMetric(ServiceEmitter emitter, AtomicLong accumulator)
{
return from(
CPUTimeMetricQueryRunner.safeBuild(
baseRunner,
toolChest,
emitter,
accumulator,
true
)
);
}
public FluentQueryRunner<T> postProcess(PostProcessingOperator<T> postProcessing)
{
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
}
public FluentQueryRunner<T> mergeResults()
{
return from(toolChest.mergeResults(baseRunner));
}
public FluentQueryRunner<T> map(final Function<QueryRunner<T>, QueryRunner<T>> mapFn)
{
return from(mapFn.apply(baseRunner));
}
/**
* Sets the toString of the QueryRunner. This is used because QueryRunner objects are often used as parameters for
* tests and the toString() value of the QueryRunners are used for the name of the test.
*
* This method doesn't return a FluentQueryRunner because it breaks the fluency.
*
* @param toStringValue the value that the resulting QueryRunner should return from its toString method.
* @return a QueryRunner that will return toStringValue from its toString method
*/
public QueryRunner<T> setToString(String toStringValue)
{
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return FluentQueryRunner.this.run(queryPlus, responseContext);
}
@Override
public String toString()
{
return toStringValue;
}
};
}
}

View File

@ -1,106 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.context.ResponseContext;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
public class FluentQueryRunnerBuilder<T>
{
final QueryToolChest<T, Query<T>> toolChest;
public FluentQueryRunner create(QueryRunner<T> baseRunner)
{
return new FluentQueryRunner(baseRunner);
}
public FluentQueryRunnerBuilder(QueryToolChest<T, Query<T>> toolChest)
{
this.toolChest = toolChest;
}
public class FluentQueryRunner implements QueryRunner<T>
{
private QueryRunner<T> baseRunner;
public FluentQueryRunner(QueryRunner<T> runner)
{
this.baseRunner = runner;
}
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return baseRunner.run(queryPlus, responseContext);
}
public FluentQueryRunner from(QueryRunner<T> runner)
{
return new FluentQueryRunner(runner);
}
public FluentQueryRunner applyPostMergeDecoration()
{
return from(new FinalizeResultsQueryRunner<>(toolChest.postMergeQueryDecoration(baseRunner), toolChest));
}
public FluentQueryRunner applyPreMergeDecoration()
{
return from(new UnionQueryRunner<>(toolChest.preMergeQueryDecoration(baseRunner)));
}
public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter)
{
return emitCPUTimeMetric(emitter, new AtomicLong(0L));
}
public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter, AtomicLong accumulator)
{
return from(
CPUTimeMetricQueryRunner.safeBuild(
baseRunner,
toolChest,
emitter,
accumulator,
true
)
);
}
public FluentQueryRunner postProcess(PostProcessingOperator<T> postProcessing)
{
return from(postProcessing != null ? postProcessing.postProcess(baseRunner) : baseRunner);
}
public FluentQueryRunner mergeResults()
{
return from(toolChest.mergeResults(baseRunner));
}
public FluentQueryRunner map(final Function<QueryRunner<T>, QueryRunner<T>> mapFn)
{
return from(mapFn.apply(baseRunner));
}
}
}

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Function;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.UOE;
@ -83,7 +84,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Perform any per-query decoration of an {@link ObjectMapper} that enables it to read and write objects of the
* query's {@link ResultType}. It is used by QueryResource on the write side, and DirectDruidClient on the read side.
*
* <p>
* For most queries, this is a no-op, but it can be useful for query types that support more than one result
* serialization format. Queries that implement this method must not modify the provided ObjectMapper, but instead
* must return a copy.
@ -96,16 +97,21 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* This method wraps a QueryRunner. The input QueryRunner, by contract, will provide a series of
* ResultType objects in time order (ascending or descending). This method should return a new QueryRunner that
* potentially merges the stream of ordered ResultType objects.
*
* merges the stream of ordered ResultType objects.
* <p>
* A default implementation constructs a {@link ResultMergeQueryRunner} which creates a
* {@link org.apache.druid.common.guava.CombiningSequence} using the supplied {@link QueryRunner} with
* {@link QueryToolChest#createResultComparator(Query)} and {@link QueryToolChest#createMergeFn(Query)}} supplied by this
* toolchest.
* {@link QueryToolChest#createResultComparator(Query)} and {@link QueryToolChest#createMergeFn(Query)}} supplied
* by this toolchest.
* <p>
* Generally speaking, the logic that exists in makePostComputeManipulatorFn should actually exist in this method.
* Additionally, if a query supports PostAggregations, this method should take steps to ensure that it computes
* PostAggregations a minimum number of times. This is most commonly achieved by computing the PostAgg results
* during merge <strong>and also</strong> rewriting the query such that it has the minimum number of PostAggs (most
* often zero).
*
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending or descending)
*
* @return a QueryRunner that potentially merges the stream of ordered ResultType objects
* @return a QueryRunner that merges the stream of ordered ResultType objects
*/
public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner)
{
@ -118,7 +124,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* {@link QueryToolChest#mergeResults(QueryRunner)} and also used in
* {@link org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence} by 'CachingClusteredClient' if it
* does not return null.
*
* <p>
* Returning null from this function means that a query does not support result merging, at
* least via the mechanisms that utilize this function.
*/
@ -134,7 +140,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
*/
public Comparator<ResultType> createResultComparator(Query<ResultType> query)
{
throw new UOE("%s doesn't provide a result comparator", query.getClass().getName());
throw DruidException.defensive("%s doesn't provide a result comparator", query.getClass().getName());
}
/**
@ -155,7 +161,6 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* query passed on the created QueryMetrics object before returning.
*
* @param query The query that is being processed
*
* @return A QueryMetrics that can be used to make metrics for the provided query
*/
public abstract QueryMetrics<? super QueryType> makeMetrics(QueryType query);
@ -164,15 +169,29 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* Creates a Function that can take in a ResultType and return a new ResultType having applied
* the MetricManipulatorFn to each of the metrics.
* <p>
* This exists because the QueryToolChest is the only thing that understands the internal serialization
* format of ResultType, so it's primary responsibility is to "decompose" that structure and apply the
* given function to all metrics.
* This function's primary purpose is to help work around some challenges that exist around deserializing
* results across the wire. Specifically, different aggregators will generate different object types in a
* result set, if we wanted jackson to be able to deserialize these directly, we'd need to generate a response
* class for each query that jackson could use to deserialize things. That is not what we do. Instead, we have
* jackson deserialize Object instances and then use a MetricManipulatorFn to convert from those object instances
* to the actual object that the aggregator expects. As such, this would be more effectively named
* "makeObjectDeserializingFn".
* <p>
* It is safe and acceptable for implementations of this method to first validate that the MetricManipulationFn
* is {@link org.apache.druid.query.aggregation.MetricManipulatorFns#DESERIALIZING_INSTANCE} and throw an exception
* if it is not. If such an exception is ever thrown, it is indicative of a bug in the caller which should be fixed
* by not calling this method with anything other than the deserializing manipulator function.
* <p>
* There are some implementations where this was also tasked with computing PostAggregators, but this is actually
* not a good place to compute those as this function can be called in a number of cases when PostAggs are not
* really meaningful to compute. Instead, PostAggs should be computed in the mergeResults call and the
* mergeResults implementation should take care to ensure that PostAggs are only computed the minimum number of
* times necessary.
* <p>
* This function is called very early in the processing pipeline on the Broker.
*
* @param query The Query that is currently being processed
* @param fn The function that should be applied to all metrics in the results
*
* @return A function that will apply the provided fn to all metrics in the input ResultType object
*/
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
@ -181,14 +200,18 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
);
/**
* Generally speaking this is the exact same thing as makePreComputeManipulatorFn. It is leveraged in order to
* compute PostAggregators on results after they have been completely merged together. To minimize walks of segments,
* it is recommended to use mergeResults() call instead of this method if possible. However, this may not always be
* possible as we dont always want to run PostAggregators and other stuff that happens there when you mergeResults.
* This manipulator functions primary purpose is to conduct finalization of aggregator values. It would be better
* named "makeFinalizingManipulatorFn", even that should really be done as part of {@link #mergeResults} instead
* of with this separate method.
* <p>
* It is safe and acceptable for implementations of this method to first validate that the MetricManipulationFn
* is either {@link org.apache.druid.query.aggregation.MetricManipulatorFns#FINALIZING_INSTANCE} or
* {@link org.apache.druid.query.aggregation.MetricManipulatorFns#IDENTITY_INSTANCE} and throw an exception
* if it is not. If such an exception is ever thrown, it is indicative of a bug in the caller which should be fixed
* by not calling this method with unsupported manipulator functions.
*
* @param query The Query that is currently being processed
* @param fn The function that should be applied to all metrics in the results
*
* @return A function that will apply the provided fn to all metrics in the input ResultType object
*/
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
@ -211,7 +234,6 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
*
* @param query The query whose results might be cached
* @param <T> The type of object that will be stored in the cache
*
* @return A CacheStrategy that can be used to populate and read from the Cache
*/
@Nullable
@ -231,7 +253,6 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* override this method and instead apply anything that might be needed here in the mergeResults() call.
*
* @param runner The runner to be wrapped
*
* @return The wrapped runner
*/
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner)
@ -249,7 +270,6 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* override this method and instead apply anything that might be needed here in the mergeResults() call.
*
* @param runner The runner to be wrapped
*
* @return The wrapped runner
*/
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner)
@ -265,7 +285,6 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* @param query The query being processed
* @param segments The list of candidate segments to be queried
* @param <T> A Generic parameter because Java is cool
*
* @return The list of segments to actually query
*/
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments)
@ -275,10 +294,10 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Returns whether this toolchest is able to handle the provided subquery.
*
* <p>
* When this method returns true, the core query stack will pass subquery datasources over to the toolchest and will
* assume they are properly handled.
*
* <p>
* When this method returns false, the core query stack will throw an error if subqueries are present. In the future,
* instead of throwing an error, the core query stack will handle the subqueries on its own.
*/
@ -292,9 +311,7 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* be the same length as each array returned by {@link #resultsAsArrays}.
*
* @param query same query passed to {@link #resultsAsArrays}
*
* @return row signature
*
* @throws UnsupportedOperationException if this query type does not support returning results as arrays
*/
public RowSignature resultArraySignature(QueryType query)
@ -307,25 +324,23 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* {@link #resultArraySignature}. This functionality is useful because it allows higher-level processors to operate on
* the results of any query in a consistent way. This is useful for the SQL layer and for any algorithm that might
* operate on the results of an inner query.
*
* <p>
* Not all query types support this method. They will throw {@link UnsupportedOperationException}, and they cannot
* be used by the SQL layer or by generic higher-level algorithms.
*
* <p>
* Some query types return less information after translating their results into arrays, especially in situations
* where there is no clear way to translate fully rich results into flat arrays. For example, the scan query does not
* include the segmentId in its array-based results, because it could potentially conflict with a 'segmentId' field
* in the actual datasource being scanned.
*
* <p>
* It is possible that there will be multiple arrays returned for a single result object. For example, in the topN
* query, each {@link org.apache.druid.query.topn.TopNResultValue} will generate a separate array for each of its
* {@code values}.
*
* <p>
* By convention, the array form should include the __time column, if present, as a long (milliseconds since epoch).
*
* @param resultSequence results of the form returned by {@link #mergeResults}
*
* @return results in array form
*
* @throws UnsupportedOperationException if this query type does not support returning results as arrays
*/
public Sequence<Object[]> resultsAsArrays(QueryType query, Sequence<ResultType> resultSequence)
@ -338,14 +353,15 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* is the one give by {@link #resultArraySignature(Query)}. If the toolchest doesn't support this method, then it can
* return an empty optional. It is the duty of the callees to throw an appropriate exception in that case or use an
* alternative fallback approach
*
* <p>
* Check documentation of {@link #resultsAsArrays(Query, Sequence)} as the behaviour of the rows represented by the
* frame sequence is identical.
*
* <p>
* Each Frame has a separate {@link RowSignature} because for some query types like the Scan query, every
* column in the final result might not be present in the individual ResultType (and subsequently Frame). Therefore,
* this is done to preserve the space by not populating the column in that particular Frame and omitting it from its
* signature
*
* @param query Query being executed by the toolchest. Used to determine the rowSignature of the Frames
* @param resultSequence results of the form returned by {@link #mergeResults(QueryRunner)}
* @param memoryAllocatorFactory

View File

@ -155,20 +155,37 @@ public class AggregatorUtil
public static final byte ARRAY_OF_DOUBLES_SKETCH_TO_METRICS_SUM_ESTIMATE_CACHE_TYPE_ID = 0x4E;
/**
* returns the list of dependent postAggregators that should be calculated in order to calculate given postAgg
* Given a list of PostAggregators and the name of an output column, returns the minimal list of PostAggregators
* required to compute the output column.
*
* If the outputColumn does not exist in the list of PostAggregators, the return list will be empty (under the
* assumption that the outputColumn comes from a project, aggregation or really anything other than a
* PostAggregator).
*
* If the outputColumn <strong>does</strong> exist in the list of PostAggregators, then the return list will have at
* least one element. If the PostAggregator with outputName depends on any other PostAggregators, then the returned
* list will contain all PostAggregators required to compute the outputColumn.
*
* Note that PostAggregators are processed in list-order, meaning that for a PostAggregator to depend on another
* PostAggregator, the "depender" must exist *after* the "dependee" in the list. That is, if PostAggregator A
* depends on PostAggregator B, then the list should be [B, A], such that A is computed after B.
*
* @param postAggregatorList List of postAggregator, there is a restriction that the list should be in an order such
* that all the dependencies of any given aggregator should occur before that aggregator.
* See AggregatorUtilTest.testOutOfOrderPruneDependentPostAgg for example.
* @param postAggName name of the postAgg on which dependency is to be calculated
* @param outputName name of the postAgg on which dependency is to be calculated
*
* @return the list of dependent postAggregators
*/
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String postAggName)
public static List<PostAggregator> pruneDependentPostAgg(List<PostAggregator> postAggregatorList, String outputName)
{
if (postAggregatorList.isEmpty()) {
return postAggregatorList;
}
ArrayList<PostAggregator> rv = new ArrayList<>();
Set<String> deps = new HashSet<>();
deps.add(postAggName);
deps.add(outputName);
// Iterate backwards to find the last calculated aggregate and add dependent aggregator as we find dependencies
// in reverse order
for (PostAggregator agg : Lists.reverse(postAggregatorList)) {

View File

@ -163,7 +163,7 @@ public class TopNLexicographicResultBuilder implements TopNResultBuilder
);
return new Result<>(
timestamp,
new TopNResultValue(Lists.transform(Arrays.asList(holderValueArray), DimValHolder::getMetricValues))
TopNResultValue.create(Lists.transform(Arrays.asList(holderValueArray), DimValHolder::getMetricValues))
);
}

View File

@ -243,6 +243,6 @@ public class TopNNumericResultBuilder implements TopNResultBuilder
// Pull out top aggregated values
final List<Map<String, Object>> values = Lists.transform(holderValues, DimValHolder::getMetricValues);
return new Result<>(timestamp, new TopNResultValue(values));
return new Result<>(timestamp, TopNResultValue.create(values));
}
}

View File

@ -39,7 +39,6 @@ import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -202,6 +201,11 @@ public class TopNQuery extends BaseQuery<Result<TopNResultValue>>
return new TopNQueryBuilder(this).aggregators(aggregatorSpecs).build();
}
public TopNQuery withPostAggregatorSpecs(List<PostAggregator> postAggs)
{
return new TopNQueryBuilder(this).postAggregators(postAggs).build();
}
@Override
public Query<Result<TopNResultValue>> withDataSource(DataSource dataSource)
{

View File

@ -26,6 +26,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.allocation.MemoryAllocatorFactory;
@ -48,9 +49,11 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.context.ResponseContext;
@ -63,12 +66,11 @@ import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BinaryOperator;
/**
*
@ -116,10 +118,12 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public BinaryOperator<Result<TopNResultValue>> createMergeFn(
Query<Result<TopNResultValue>> query
)
public QueryRunner<Result<TopNResultValue>> mergeResults(QueryRunner<Result<TopNResultValue>> runner)
{
final ResultMergeQueryRunner<Result<TopNResultValue>> delegateRunner = new ResultMergeQueryRunner<>(
runner,
query -> ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending()),
query -> {
TopNQuery topNQuery = (TopNQuery) query;
return new TopNBinaryFn(
topNQuery.getGranularity(),
@ -130,11 +134,41 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
topNQuery.getPostAggregatorSpecs()
);
}
);
return (queryPlus, responseContext) -> {
final TopNQuery query = (TopNQuery) queryPlus.getQuery();
final List<PostAggregator> prunedPostAggs = prunePostAggregators(query);
@Override
public Comparator<Result<TopNResultValue>> createResultComparator(Query<Result<TopNResultValue>> query)
{
return ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending());
//noinspection unchecked
return (Sequence) delegateRunner.run(
// Rewrite to prune the post aggs for downstream queries to only the minimum required. That is, if
// the topN query sorts by the PostAgg, then it must be pushed down, otherwise, it can be pruned.
queryPlus.withQuery(query.withPostAggregatorSpecs(prunedPostAggs)),
responseContext
).map(
result -> {
final List<PostAggregator> postAggs = query.getPostAggregatorSpecs();
if (query.context().isBySegment()) {
@SuppressWarnings("unchecked")
final BySegmentResultValue<Result<TopNResultValue>> bySeg =
(BySegmentResultValue<Result<TopNResultValue>>) result.getValue();
final List<Result<TopNResultValue>> results = bySeg.getResults();
final List<Result<TopNResultValue>> resultValues = new ArrayList<>(results.size());
for (Result<TopNResultValue> bySegResult : results) {
resultValues.add(resultWithPostAggs(postAggs, bySegResult));
}
return new Result<>(
result.getTimestamp(),
new BySegmentTopNResultValue(resultValues, bySeg.getSegmentId(), bySeg.getInterval())
);
} else {
return resultWithPostAggs(postAggs, result);
}
}
);
};
}
@Override
@ -151,10 +185,13 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
final MetricManipulationFn fn
)
{
//noinspection ObjectEquality
if (MetricManipulatorFns.deserializing() != fn) {
throw DruidException.defensive("This method can only be used to deserialize.");
}
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
private String dimension = query.getDimensionSpec().getOutputName();
private final List<PostAggregator> prunedAggs = prunePostAggregators(query);
private final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs());
@ -162,119 +199,68 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
result.getValue(),
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMapWithExpectedSize(
aggregatorFactories.length
+ prunedAggs.size()
+ 1
);
final List<DimensionAndMetricValueExtractor> values = result.getValue().getValue();
final List<DimensionAndMetricValueExtractor> newValues = new ArrayList<>(values.size());
for (DimensionAndMetricValueExtractor input : values) {
final Map<String, Object> map = new LinkedHashMap<>(input.getBaseObject());
for (int i = 0; i < aggregatorFactories.length; ++i) {
final String aggName = aggFactoryNames[i];
values.put(aggName, fn.manipulate(aggregatorFactories[i], input.getMetric(aggName)));
map.put(aggName, aggregatorFactories[i].deserialize(map.get(aggName)));
}
for (PostAggregator postAgg : prunedAggs) {
final String name = postAgg.getName();
Object calculatedPostAgg = input.getMetric(name);
if (calculatedPostAgg != null) {
values.put(name, calculatedPostAgg);
} else {
values.put(name, postAgg.compute(values));
newValues.add(new DimensionAndMetricValueExtractor(map));
}
}
values.put(dimension, input.getDimensionValue(dimension));
return values;
}
}
)
);
return new Result<TopNResultValue>(
result.getTimestamp(),
new TopNResultValue(serializedValues)
);
return new Result<>(result.getTimestamp(), new TopNResultValue(newValues));
}
};
}
@SuppressWarnings("ObjectEquality")
@Override
public Function<Result<TopNResultValue>, Result<TopNResultValue>> makePostComputeManipulatorFn(
final TopNQuery query,
final MetricManipulationFn fn
TopNQuery query,
MetricManipulationFn fn
)
{
if (MetricManipulatorFns.identity() == fn) {
return result -> result;
}
if (MetricManipulatorFns.finalizing() == fn) {
return new Function<Result<TopNResultValue>, Result<TopNResultValue>>()
{
private String dimension = query.getDimensionSpec().getOutputName();
private final AggregatorFactory[] aggregatorFactories = query.getAggregatorSpecs()
.toArray(new AggregatorFactory[0]);
private final String[] aggFactoryNames = extractFactoryName(query.getAggregatorSpecs());
private final PostAggregator[] postAggregators = query.getPostAggregatorSpecs().toArray(new PostAggregator[0]);
@Override
public Result<TopNResultValue> apply(Result<TopNResultValue> result)
{
List<Map<String, Object>> serializedValues = Lists.newArrayList(
Iterables.transform(
result.getValue(),
new Function<DimensionAndMetricValueExtractor, Map<String, Object>>()
{
@Override
public Map<String, Object> apply(DimensionAndMetricValueExtractor input)
{
final Map<String, Object> values = Maps.newHashMapWithExpectedSize(
aggregatorFactories.length
+ query.getPostAggregatorSpecs().size()
+ 1
);
final List<DimensionAndMetricValueExtractor> values = result.getValue().getValue();
final List<DimensionAndMetricValueExtractor> newValues = new ArrayList<>(values.size());
// Put non-finalized aggregators before post-aggregators.
for (final String name : aggFactoryNames) {
values.put(name, input.getMetric(name));
for (DimensionAndMetricValueExtractor input : values) {
final Map<String, Object> map = new LinkedHashMap<>(input.getBaseObject());
for (int i = 0; i < aggregatorFactories.length; ++i) {
final String aggName = aggFactoryNames[i];
map.put(aggName, aggregatorFactories[i].finalizeComputation(map.get(aggName)));
}
// Put dimension, post-aggregators might depend on it.
values.put(dimension, input.getDimensionValue(dimension));
// Put post-aggregators.
for (PostAggregator postAgg : postAggregators) {
Object calculatedPostAgg = input.getMetric(postAgg.getName());
if (calculatedPostAgg != null) {
values.put(postAgg.getName(), calculatedPostAgg);
} else {
values.put(postAgg.getName(), postAgg.compute(values));
}
newValues.add(new DimensionAndMetricValueExtractor(map));
}
// Put finalized aggregators now that post-aggregators are done.
for (int i = 0; i < aggFactoryNames.length; ++i) {
final String name = aggFactoryNames[i];
values.put(name, fn.manipulate(aggregatorFactories[i], input.getMetric(name)));
}
return values;
}
}
)
);
return new Result<>(
result.getTimestamp(),
new TopNResultValue(serializedValues)
);
return new Result<>(result.getTimestamp(), new TopNResultValue(newValues));
}
};
}
throw DruidException.defensive("This method can only be used to finalize.");
}
@Override
public TypeReference<Result<TopNResultValue>> getResultTypeReference()
{
@ -425,7 +411,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
retVal.add(vals);
}
return new Result<>(timestamp, new TopNResultValue(retVal));
return new Result<>(timestamp, TopNResultValue.create(retVal));
}
};
}
@ -486,7 +472,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new Result<TopNResultValue>(
input.getTimestamp(),
new TopNResultValue(
TopNResultValue.create(
Lists.transform(
resultValue.getValue(),
new Function<DimensionAndMetricValueExtractor, DimensionAndMetricValueExtractor>()
@ -592,6 +578,24 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)));
}
private Result<TopNResultValue> resultWithPostAggs(List<PostAggregator> postAggs, Result<TopNResultValue> result)
{
final List<DimensionAndMetricValueExtractor> values = result.getValue().getValue();
final List<DimensionAndMetricValueExtractor> newValues = new ArrayList<>(values.size());
for (DimensionAndMetricValueExtractor input : values) {
final Map<String, Object> map = new LinkedHashMap<>(input.getBaseObject());
for (PostAggregator postAgg : postAggs) {
map.put(postAgg.getName(), postAgg.compute(map));
}
newValues.add(new DimensionAndMetricValueExtractor(map));
}
return new Result<>(result.getTimestamp(), new TopNResultValue(newValues));
}
static class ThresholdAdjustingQueryRunner implements QueryRunner<Result<TopNResultValue>>
{
private final QueryRunner<Result<TopNResultValue>> runner;
@ -618,7 +622,8 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
final TopNQuery query = (TopNQuery) input;
final int minTopNThreshold = query.context().getInt(QueryContexts.MIN_TOP_N_THRESHOLD, config.getMinTopNThreshold());
final int minTopNThreshold = query.context()
.getInt(QueryContexts.MIN_TOP_N_THRESHOLD, config.getMinTopNThreshold());
if (query.getThreshold() > minTopNThreshold) {
return runner.run(queryPlus, responseContext);
}
@ -648,7 +653,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
{
return new Result<>(
input.getTimestamp(),
new TopNResultValue(
TopNResultValue.create(
Lists.<Object>newArrayList(
Iterables.limit(
input.getValue(),
@ -668,7 +673,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
return new Result<>(
input.getTimestamp(),
new TopNResultValue(
TopNResultValue.create(
Lists.<Object>newArrayList(
Iterables.limit(
input.getValue(),

View File

@ -25,28 +25,29 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.IAE;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
*
*/
public class TopNResultValue implements Iterable<DimensionAndMetricValueExtractor>
{
private final List<DimensionAndMetricValueExtractor> value;
private final List<DimensionAndMetricValueExtractor> valueList;
@JsonCreator
public TopNResultValue(List<?> value)
public static TopNResultValue create(List<?> value)
{
this.value = (value == null) ? new ArrayList<>() : Lists.transform(
if (value == null) {
return new TopNResultValue(new ArrayList<>());
}
return new TopNResultValue(Lists.transform(
value,
new Function<Object, DimensionAndMetricValueExtractor>()
{
@Override
public DimensionAndMetricValueExtractor apply(@Nullable Object input)
{
(Function<Object, DimensionAndMetricValueExtractor>) input -> {
if (input instanceof Map) {
return new DimensionAndMetricValueExtractor((Map) input);
} else if (input instanceof DimensionAndMetricValueExtractor) {
@ -55,27 +56,31 @@ public class TopNResultValue implements Iterable<DimensionAndMetricValueExtracto
throw new IAE("Unknown type for input[%s]", input.getClass());
}
}
));
}
);
public TopNResultValue(List<DimensionAndMetricValueExtractor> valueList)
{
this.valueList = valueList;
}
@JsonValue
public List<DimensionAndMetricValueExtractor> getValue()
{
return value;
return valueList;
}
@Override
public Iterator<DimensionAndMetricValueExtractor> iterator()
{
return value.iterator();
return valueList.iterator();
}
@Override
public String toString()
{
return "TopNResultValue{" +
"value=" + value +
"valueList=" + valueList +
'}';
}
@ -91,16 +96,12 @@ public class TopNResultValue implements Iterable<DimensionAndMetricValueExtracto
TopNResultValue that = (TopNResultValue) o;
if (value != null ? !value.equals(that.value) : that.value != null) {
return false;
}
return true;
return Objects.equals(valueList, that.valueList);
}
@Override
public int hashCode()
{
return value != null ? value.hashCode() : 0;
return valueList != null ? valueList.hashCode() : 0;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.java.util.metrics;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
import java.util.List;
@ -52,7 +53,7 @@ public interface MetricsVerifier
default void verifyEmitted(String metricName, Map<String, Object> dimensionFilters, int times)
{
Assert.assertEquals(
"Metric was emitted unexpected number of times.",
StringUtils.format("Metric [%s] was emitted unexpected number of times.", metricName),
times,
getMetricValues(metricName, dimensionFilters).size()
);

View File

@ -1117,7 +1117,7 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.of(
"tags", "t3",
@ -1184,7 +1184,7 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
expected
)
)
@ -1242,7 +1242,7 @@ public class MultiValuedDimensionTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
expected
)
)

View File

@ -51,7 +51,6 @@ import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFact
import org.apache.druid.query.aggregation.post.ArithmeticPostAggregator;
import org.apache.druid.query.aggregation.post.ConstantPostAggregator;
import org.apache.druid.query.aggregation.post.FieldAccessPostAggregator;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
@ -60,13 +59,11 @@ import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineObjectHolder;
@ -76,13 +73,13 @@ import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -149,12 +146,30 @@ public class QueryRunnerTestHelper
"index",
INDEX_METRIC
);
public static final LongMinAggregatorFactory INDEX_LONG_MIN = new LongMinAggregatorFactory(LONG_MIN_INDEX_METRIC, INDEX_METRIC);
public static final LongMaxAggregatorFactory INDEX_LONG_MAX = new LongMaxAggregatorFactory(LONG_MAX_INDEX_METRIC, INDEX_METRIC);
public static final DoubleMinAggregatorFactory INDEX_DOUBLE_MIN = new DoubleMinAggregatorFactory(DOUBLE_MIN_INDEX_METRIC, INDEX_METRIC);
public static final DoubleMaxAggregatorFactory INDEX_DOUBLE_MAX = new DoubleMaxAggregatorFactory(DOUBLE_MAX_INDEX_METRIC, INDEX_METRIC);
public static final FloatMinAggregatorFactory INDEX_FLOAT_MIN = new FloatMinAggregatorFactory(FLOAT_MIN_INDEX_METRIC, INDEX_METRIC);
public static final FloatMaxAggregatorFactory INDEX_FLOAT_MAX = new FloatMaxAggregatorFactory(FLOAT_MAX_INDEX_METRIC, INDEX_METRIC);
public static final LongMinAggregatorFactory INDEX_LONG_MIN = new LongMinAggregatorFactory(
LONG_MIN_INDEX_METRIC,
INDEX_METRIC
);
public static final LongMaxAggregatorFactory INDEX_LONG_MAX = new LongMaxAggregatorFactory(
LONG_MAX_INDEX_METRIC,
INDEX_METRIC
);
public static final DoubleMinAggregatorFactory INDEX_DOUBLE_MIN = new DoubleMinAggregatorFactory(
DOUBLE_MIN_INDEX_METRIC,
INDEX_METRIC
);
public static final DoubleMaxAggregatorFactory INDEX_DOUBLE_MAX = new DoubleMaxAggregatorFactory(
DOUBLE_MAX_INDEX_METRIC,
INDEX_METRIC
);
public static final FloatMinAggregatorFactory INDEX_FLOAT_MIN = new FloatMinAggregatorFactory(
FLOAT_MIN_INDEX_METRIC,
INDEX_METRIC
);
public static final FloatMaxAggregatorFactory INDEX_FLOAT_MAX = new FloatMaxAggregatorFactory(
FLOAT_MAX_INDEX_METRIC,
INDEX_METRIC
);
public static final String JS_COMBINE_A_PLUS_B = "function combine(a, b) { return a + b; }";
public static final String JS_RESET_0 = "function reset() { return 0; }";
public static final JavaScriptAggregatorFactory JS_INDEX_SUM_IF_PLACEMENTISH_A = new JavaScriptAggregatorFactory(
@ -374,44 +389,63 @@ public class QueryRunnerTestHelper
QueryRunnerFactory<T, QueryType> factory
)
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final IncrementalIndex noRollupRtIndex = TestIndex.getNoRollupIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex noRollupMMappedTestIndex = TestIndex.getNoRollupMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
final QueryableIndex frontCodedMappedTestIndex = TestIndex.getFrontCodedMMappedTestIndex();
BiFunction<String, Segment, QueryRunner<T>> maker = (name, seg) -> makeQueryRunner(factory, seg, name);
return ImmutableList.of(
makeQueryRunner(factory, new IncrementalIndexSegment(rtIndex, SEGMENT_ID), ("rtIndex")),
makeQueryRunner(factory, new IncrementalIndexSegment(noRollupRtIndex, SEGMENT_ID), "noRollupRtIndex"),
makeQueryRunner(factory, new QueryableIndexSegment(mMappedTestIndex, SEGMENT_ID), "mMappedTestIndex"),
makeQueryRunner(
factory,
new QueryableIndexSegment(noRollupMMappedTestIndex, SEGMENT_ID),
"noRollupMMappedTestIndex"
maker.apply(
"rtIndex",
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), SEGMENT_ID)
),
makeQueryRunner(factory, new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID), "mergedRealtimeIndex"),
makeQueryRunner(factory, new QueryableIndexSegment(frontCodedMappedTestIndex, SEGMENT_ID), "frontCodedMMappedTestIndex")
);
}
@SuppressWarnings("unchecked")
public static Collection<?> makeUnionQueryRunners(QueryRunnerFactory factory)
{
final IncrementalIndex rtIndex = TestIndex.getIncrementalTestIndex();
final QueryableIndex mMappedTestIndex = TestIndex.getMMappedTestIndex();
final QueryableIndex mergedRealtimeIndex = TestIndex.mergedRealtimeIndex();
return Arrays.asList(
makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, SEGMENT_ID), "rtIndex"),
makeUnionQueryRunner(factory, new QueryableIndexSegment(mMappedTestIndex, SEGMENT_ID), "mMappedTestIndex"),
makeUnionQueryRunner(
factory,
new QueryableIndexSegment(mergedRealtimeIndex, SEGMENT_ID),
"mergedRealtimeIndex"
maker.apply(
"noRollupRtIndex",
new IncrementalIndexSegment(TestIndex.getNoRollupIncrementalTestIndex(), SEGMENT_ID)
),
maker.apply(
"mMappedTestIndex",
new QueryableIndexSegment(TestIndex.getMMappedTestIndex(), SEGMENT_ID)
),
maker.apply(
"noRollupMMappedTestIndex",
new QueryableIndexSegment(TestIndex.getNoRollupMMappedTestIndex(), SEGMENT_ID)
),
maker.apply(
"mergedRealtimeIndex",
new QueryableIndexSegment(TestIndex.mergedRealtimeIndex(), SEGMENT_ID)
),
maker.apply(
"frontCodedMMappedTestIndex",
new QueryableIndexSegment(TestIndex.getFrontCodedMMappedTestIndex(), SEGMENT_ID)
)
);
}
public static <T, QueryType extends Query<T>> List<QueryRunner<T>> makeQueryRunnersToMerge(
QueryRunnerFactory<T, QueryType> factory
)
{
return mapQueryRunnersToMerge(factory, makeQueryRunners(factory));
}
public static <T, QueryType extends Query<T>> ArrayList<QueryRunner<T>> mapQueryRunnersToMerge(
QueryRunnerFactory<T, QueryType> factory,
List<QueryRunner<T>> runners
)
{
final ArrayList<QueryRunner<T>> retVal = new ArrayList<>(runners.size());
final QueryToolChest<T, QueryType> toolchest = factory.getToolchest();
for (QueryRunner<T> baseRunner : runners) {
retVal.add(
FluentQueryRunner.create(baseRunner, toolchest)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.setToString(baseRunner.toString())
);
}
return retVal;
}
public static <T, QueryType extends Query<T>> QueryRunner<T> makeQueryRunner(
QueryRunnerFactory<T, QueryType> factory,
@ -443,43 +477,20 @@ public class QueryRunnerTestHelper
final String runnerName
)
{
return new FinalizeResultsQueryRunner<T>(
new BySegmentQueryRunner<>(segmentId, adapter.getDataInterval().getStart(), factory.createRunner(adapter)),
(QueryToolChest<T, Query<T>>) factory.getToolchest()
//noinspection
return new BySegmentQueryRunner<T>(
segmentId,
adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
)
{
@Override
public String toString()
{
return runnerName;
}
};
}
public static <T> QueryRunner<T> makeUnionQueryRunner(
QueryRunnerFactory<T, Query<T>> factory,
Segment adapter,
final String runnerName
)
{
BySegmentQueryRunner<T> bySegmentQueryRunner =
new BySegmentQueryRunner<>(SEGMENT_ID, adapter.getDataInterval().getStart(), factory.createRunner(adapter));
final QueryRunner<T> runner = new FluentQueryRunnerBuilder<T>(factory.getToolchest())
.create(new UnionQueryRunner<>(bySegmentQueryRunner))
.mergeResults()
.applyPostMergeDecoration();
return new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
return runner.run(queryPlus, responseContext);
}
@Override
public String toString()
{
// Tests that use these QueryRunners directly are parameterized and use the toString of their QueryRunner as
// the name of the test. It would be better if the usages were adjusted to actually parameterize with an extra
// name parameter, or use a different object or something like that, but for now, we have to overload toString
// to name it so that the parameterization continues to work.
return runnerName;
}
};
@ -501,19 +512,16 @@ public class QueryRunnerTestHelper
return makeQueryRunner(factory, segmentReference, runnerName);
}
@SuppressWarnings({"rawtypes", "unchecked"})
public static <T> QueryRunner<T> makeFilteringQueryRunner(
final VersionedIntervalTimeline<String, ReferenceCountingSegment> timeline,
final QueryRunnerFactory<T, Query<T>> factory
)
{
final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FluentQueryRunnerBuilder<T>(toolChest)
return FluentQueryRunner
.create(
new QueryRunner<T>()
{
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
(queryPlus, responseContext) -> {
Query<T> query = queryPlus.getQuery();
List<TimelineObjectHolder> segments = new ArrayList<>();
for (Interval interval : query.getIntervals()) {
@ -539,8 +547,8 @@ public class QueryRunnerTestHelper
sequences.add(factory.createRunner(segment).run(queryPlusRunning, responseContext));
}
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
}
}
},
toolChest
)
.applyPreMergeDecoration()
.mergeResults()

View File

@ -477,6 +477,7 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
BUFFER_POOLS = null;
}
@SuppressWarnings("unused")
public GroupByQueryRunnerTest(
String testName,
GroupByQueryConfig config,
@ -489,7 +490,6 @@ public class GroupByQueryRunnerTest extends InitializedNullHandlingTest
this.factory = factory;
this.runner = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner));
this.originalRunner = runner;
String runnerName = runner.toString();
this.vectorize = vectorize;
}

View File

@ -45,7 +45,6 @@ public class GroupByQueryRunnerTestHelper
{
public static <T> Iterable<T> runQuery(QueryRunnerFactory factory, QueryRunner runner, Query<T> query)
{
QueryToolChest toolChest = factory.getToolchest();
QueryRunner<T> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(runner)),

View File

@ -28,17 +28,14 @@ import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
@ -101,7 +98,7 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
final List<Object[]> constructors = new ArrayList<>();
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunners(factory)) {
for (QueryRunner<ResultRow> runner : QueryRunnerTestHelper.makeQueryRunnersToMerge(factory)) {
final QueryRunner modifiedRunner = new QueryRunner()
{
@Override
@ -109,15 +106,6 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
{
final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery();
QueryRunner<ResultRow> newRunner = factory.mergeRunners(
Execs.directExecutor(), ImmutableList.of(runner)
);
QueryToolChest toolChest = factory.getToolchest();
newRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(toolChest.preMergeQueryDecoration(newRunner)),
toolChest
);
final String timeDimension = tsQuery.getTimestampResultField();
final List<VirtualColumn> virtualColumns = new ArrayList<>(
@ -163,11 +151,12 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
.setAggregatorSpecs(tsQuery.getAggregatorSpecs())
.setPostAggregatorSpecs(tsQuery.getPostAggregatorSpecs())
.setVirtualColumns(VirtualColumns.create(virtualColumns))
.setLimit(tsQuery.getLimit())
.setContext(theContext)
.build();
return Sequences.map(
newRunner.run(queryPlus.withQuery(newQuery), responseContext),
runner.run(queryPlus.withQuery(newQuery), responseContext),
new Function<ResultRow, Result<TimeseriesResultValue>>()
{
@Override
@ -275,8 +264,16 @@ public class GroupByTimeseriesQueryRunnerTest extends TimeseriesQueryRunnerTest
);
Assert.assertEquals(59L, (long) result.getValue().getLongMetric(QueryRunnerTestHelper.LONG_MIN_INDEX_METRIC));
Assert.assertEquals(1870, (long) result.getValue().getLongMetric(QueryRunnerTestHelper.LONG_MAX_INDEX_METRIC));
Assert.assertEquals(59.021022D, result.getValue().getDoubleMetric(QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC), 0);
Assert.assertEquals(1870.061029D, result.getValue().getDoubleMetric(QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC), 0);
Assert.assertEquals(
59.021022D,
result.getValue().getDoubleMetric(QueryRunnerTestHelper.DOUBLE_MIN_INDEX_METRIC),
0
);
Assert.assertEquals(
1870.061029D,
result.getValue().getDoubleMetric(QueryRunnerTestHelper.DOUBLE_MAX_INDEX_METRIC),
0
);
Assert.assertEquals(59.021023F, result.getValue().getFloatMetric(QueryRunnerTestHelper.FLOAT_MIN_INDEX_METRIC), 0);
Assert.assertEquals(1870.061F, result.getValue().getFloatMetric(QueryRunnerTestHelper.FLOAT_MAX_INDEX_METRIC), 0);
}

View File

@ -42,6 +42,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
@ -50,15 +51,17 @@ import java.util.List;
public class SegmentMetadataUnionQueryTest extends InitializedNullHandlingTest
{
private static final QueryRunnerFactory FACTORY = new SegmentMetadataQueryRunnerFactory(
private static final QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery> FACTORY =
new SegmentMetadataQueryRunnerFactory(
new SegmentMetadataQueryQueryToolChest(new SegmentMetadataQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
private final QueryRunner runner;
private final QueryRunner<SegmentAnalysis> runner;
private final boolean mmap;
public SegmentMetadataUnionQueryTest(
QueryRunner runner,
QueryRunner<SegmentAnalysis> runner,
boolean mmap
)
{
@ -69,23 +72,25 @@ public class SegmentMetadataUnionQueryTest extends InitializedNullHandlingTest
@Parameterized.Parameters
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{
QueryRunnerTestHelper.makeUnionQueryRunner(
final ArrayList<QueryRunner<SegmentAnalysis>> runners = QueryRunnerTestHelper.mapQueryRunnersToMerge(
FACTORY,
ImmutableList.of(
QueryRunnerTestHelper.makeQueryRunner(
FACTORY,
new QueryableIndexSegment(TestIndex.getMMappedTestIndex(), QueryRunnerTestHelper.SEGMENT_ID),
null
),
true,
},
new Object[]{
QueryRunnerTestHelper.makeUnionQueryRunner(
QueryRunnerTestHelper.makeQueryRunner(
FACTORY,
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(), QueryRunnerTestHelper.SEGMENT_ID),
null
),
false
}
)
)
);
return ImmutableList.of(
new Object[]{runners.get(0), true},
new Object[]{runners.get(1), false}
);
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -106,8 +107,10 @@ public class SearchQueryRunnerTest extends InitializedNullHandlingTest
)
{
this.runner = runner;
this.decoratedRunner = TOOL_CHEST.postMergeQueryDecoration(
TOOL_CHEST.mergeResults(TOOL_CHEST.preMergeQueryDecoration(runner)));
this.decoratedRunner = FluentQueryRunner.create(runner, TOOL_CHEST)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration();
}
@Test

View File

@ -60,7 +60,7 @@ public class TimeSeriesUnionQueryRunnerTest extends InitializedNullHandlingTest
public static Iterable<Object[]> constructorFeeder()
{
return QueryRunnerTestHelper.cartesian(
QueryRunnerTestHelper.makeUnionQueryRunners(
QueryRunnerTestHelper.makeQueryRunnersToMerge(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),

View File

@ -110,7 +110,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
{
final Iterable<Object[]> baseConstructors = QueryRunnerTestHelper.cartesian(
// runners
QueryRunnerTestHelper.makeQueryRunners(
QueryRunnerTestHelper.makeQueryRunnersToMerge(
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(),
new TimeseriesQueryEngine(),
@ -148,13 +148,13 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
TestHelper.assertExpectedResults(expectedResults, results);
}
protected final QueryRunner runner;
protected final QueryRunner<Result<TimeseriesResultValue>> runner;
protected final boolean descending;
protected final boolean vectorize;
private final List<AggregatorFactory> aggregatorFactoryList;
public TimeseriesQueryRunnerTest(
QueryRunner runner,
QueryRunner<Result<TimeseriesResultValue>> runner,
boolean descending,
boolean vectorize,
List<AggregatorFactory> aggregatorFactoryList
@ -2965,16 +2965,7 @@ public class TimeseriesQueryRunnerTest extends InitializedNullHandlingTest
.context(makeContext())
.build();
// Must create a toolChest so we can run mergeResults.
QueryToolChest<Result<TimeseriesResultValue>, TimeseriesQuery> toolChest = new TimeseriesQueryQueryToolChest();
// Must wrapped in a results finalizer to stop the runner's builtin finalizer from being called.
final FinalizeResultsQueryRunner finalRunner = new FinalizeResultsQueryRunner(
toolChest.mergeResults(runner),
toolChest
);
final List list = finalRunner.run(QueryPlus.wrap(query)).toList();
final List list = runner.run(QueryPlus.wrap(query)).toList();
Assert.assertEquals(10, list.size());
}

View File

@ -100,7 +100,7 @@ public class TopNBinaryFnBenchmark extends SimpleBenchmark
}
result1 = new Result<>(
currTime,
new TopNResultValue(list)
TopNResultValue.create(list)
);
List<Map<String, Object>> list2 = new ArrayList<>();
@ -116,7 +116,7 @@ public class TopNBinaryFnBenchmark extends SimpleBenchmark
}
result2 = new Result<>(
currTime,
new TopNResultValue(list2)
TopNResultValue.create(list2)
);
fn = new TopNBinaryFn(
Granularities.ALL,

View File

@ -83,7 +83,7 @@ public class TopNBinaryFnTest
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 1L,
@ -105,7 +105,7 @@ public class TopNBinaryFnTest
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 2L,
@ -128,7 +128,7 @@ public class TopNBinaryFnTest
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"testdim", "1",
@ -165,7 +165,7 @@ public class TopNBinaryFnTest
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 1L,
@ -187,7 +187,7 @@ public class TopNBinaryFnTest
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 2L,
@ -210,7 +210,7 @@ public class TopNBinaryFnTest
Result<TopNResultValue> expected = new Result<TopNResultValue>(
Granularities.DAY.bucketStart(currTime),
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"testdim", "1",
@ -246,7 +246,7 @@ public class TopNBinaryFnTest
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 1L,
@ -290,7 +290,7 @@ public class TopNBinaryFnTest
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 1L,
@ -315,7 +315,7 @@ public class TopNBinaryFnTest
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 2L,
@ -341,7 +341,7 @@ public class TopNBinaryFnTest
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"testdim", "other",
@ -385,7 +385,7 @@ public class TopNBinaryFnTest
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 1L,
@ -407,7 +407,7 @@ public class TopNBinaryFnTest
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime.plusHours(2),
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 2L,
@ -430,7 +430,7 @@ public class TopNBinaryFnTest
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"testdim", "1",
@ -466,7 +466,7 @@ public class TopNBinaryFnTest
{
Result<TopNResultValue> result1 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 1L,
@ -478,7 +478,7 @@ public class TopNBinaryFnTest
);
Result<TopNResultValue> result2 = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.<Map<String, Object>>of(
ImmutableMap.of(
"rows", 2L,
@ -496,7 +496,7 @@ public class TopNBinaryFnTest
Result<TopNResultValue> expected = new Result<TopNResultValue>(
currTime,
new TopNResultValue(
TopNResultValue.create(
ImmutableList.of(
resultMap
)

View File

@ -351,7 +351,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TopNResultValue(
TopNResultValue.create(
ImmutableList.of(
new DimensionAndMetricValueExtractor(
ImmutableMap.of("dim", "foo", "rows", 1L, "index", 2L, "uniques", 3L, "const", 1L)
@ -446,7 +446,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
final Result<TopNResultValue> result1 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"test", dimValue,
@ -474,7 +474,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
final Result<TopNResultValue> result2 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"test", dimValue,
@ -491,7 +491,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
if (valueType.is(ValueType.FLOAT)) {
typeAdjustedResult2 = new Result<>(
DateTimes.utc(123L),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"test", dimValue,
@ -505,7 +505,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
} else if (valueType.is(ValueType.LONG)) {
typeAdjustedResult2 = new Result<>(
DateTimes.utc(123L),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"test", dimValue,
@ -576,7 +576,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
final Result<TopNResultValue> result1 = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"test", dimValue,
@ -605,7 +605,7 @@ public class TopNQueryQueryToolChestTest extends InitializedNullHandlingTest
final Result<TopNResultValue> resultLevelCacheResult = new Result<>(
// test timestamps that result in integer size millis
DateTimes.utc(123L),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.of(
"test", dimValue,

View File

@ -42,7 +42,6 @@ import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -127,14 +126,14 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
RESOURCE_CLOSER.close();
}
@Parameterized.Parameters(name = "{0}")
@Parameterized.Parameters(name = "{7}")
public static Iterable<Object[]> constructorFeeder()
{
List<QueryRunner<Result<TopNResultValue>>> retVal = queryRunners();
List<Object[]> parameters = new ArrayList<>();
for (int i = 0; i < 32; i++) {
for (QueryRunner<Result<TopNResultValue>> firstParameter : retVal) {
Object[] params = new Object[7];
Object[] params = new Object[8];
params[0] = firstParameter;
params[1] = (i & 1) != 0;
params[2] = (i & 2) != 0;
@ -142,8 +141,10 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
params[4] = (i & 8) != 0;
params[5] = (i & 16) != 0;
params[6] = QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS;
Object[] params2 = Arrays.copyOf(params, 7);
params[7] = firstParameter + " double aggs";
Object[] params2 = Arrays.copyOf(params, 8);
params2[6] = QueryRunnerTestHelper.COMMON_FLOAT_AGGREGATORS;
params2[7] = firstParameter + " float aggs";
parameters.add(params);
parameters.add(params2);
}
@ -161,7 +162,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<QueryRunner<Result<TopNResultValue>>> retVal = new ArrayList<>();
retVal.addAll(
QueryRunnerTestHelper.makeQueryRunners(
QueryRunnerTestHelper.makeQueryRunnersToMerge(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
@ -170,7 +171,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
)
);
retVal.addAll(
QueryRunnerTestHelper.makeQueryRunners(
QueryRunnerTestHelper.makeQueryRunnersToMerge(
new TopNQueryRunnerFactory(
customPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
@ -198,6 +199,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
@SuppressWarnings("unused")
public TopNQueryRunnerTest(
QueryRunner<Result<TopNResultValue>> runner,
boolean specializeGeneric1AggPooledTopN,
@ -205,7 +207,8 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
boolean specializeHistorical1SimpleDoubleAggPooledTopN,
boolean specializeHistoricalSingleValueDimSelector1SimpleDoubleAggPooledTopN,
boolean duplicateSingleAggregatorQueries,
List<AggregatorFactory> commonAggregators
List<AggregatorFactory> commonAggregators,
String testName
)
{
this.runner = runner;
@ -265,12 +268,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
private Sequence<Result<TopNResultValue>> runWithMerge(TopNQuery query, ResponseContext context)
{
final TopNQueryQueryToolChest chest = new TopNQueryQueryToolChest(new TopNQueryConfig());
final QueryRunner<Result<TopNResultValue>> mergeRunner = new FinalizeResultsQueryRunner(
chest.mergeResults(runner),
chest
);
return mergeRunner.run(QueryPlus.wrap(query), context);
return runner.run(QueryPlus.wrap(query), context);
}
@Test
@ -301,7 +299,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = ImmutableList.of(
new Result<>(
DateTimes.of("2020-04-02T00:00:00.000Z"),
new TopNResultValue(ImmutableList.of())
TopNResultValue.create(ImmutableList.of())
)
);
assertExpectedResults(expectedResults, query);
@ -334,7 +332,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")
@ -399,7 +397,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(Collections.<Map<String, Object>>singletonList(resultMap))
TopNResultValue.create(Collections.<Map<String, Object>>singletonList(resultMap))
)
);
assertExpectedResults(expectedResults, query);
@ -421,7 +419,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("alias", "theValue")
@ -461,7 +459,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")
@ -531,7 +529,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "upfront")
@ -594,7 +592,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -646,7 +644,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -689,7 +687,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -737,7 +735,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -791,7 +789,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -834,7 +832,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-01-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -856,7 +854,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-02-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -878,7 +876,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-03-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -900,7 +898,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -943,7 +941,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-01-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -965,7 +963,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-02-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -987,7 +985,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-03-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -1009,7 +1007,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -1052,7 +1050,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Arrays.asList(
new Result<>(
DateTimes.of("2011-01-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -1074,7 +1072,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-02-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -1096,7 +1094,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-03-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -1118,7 +1116,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
),
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "total_market")
@ -1166,7 +1164,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"addRowsIndexConstant", 5356.814783D,
@ -1247,7 +1245,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -1295,7 +1293,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"market", "spot",
@ -1343,7 +1341,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -1391,7 +1389,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -1432,7 +1430,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
@ -1466,7 +1464,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
@ -1518,7 +1516,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "upfront",
@ -1566,7 +1564,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -1605,7 +1603,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.build();
assertExpectedResults(
Collections.singletonList(
new Result<>(DateTimes.of("2011-04-01T00:00:00.000Z"), new TopNResultValue(Collections.emptyList()))
new Result<>(DateTimes.of("2011-04-01T00:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
),
query
);
@ -1631,7 +1629,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
.build();
assertExpectedResults(
Collections.singletonList(
new Result<>(DateTimes.of("2011-04-01T00:00:00.000Z"), new TopNResultValue(Collections.emptyList()))
new Result<>(DateTimes.of("2011-04-01T00:00:00.000Z"), TopNResultValue.create(Collections.emptyList()))
),
query
);
@ -1726,7 +1724,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
final List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"placementish", "a",
@ -1767,7 +1765,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
final List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"placementish", "preferred",
@ -1815,7 +1813,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
final List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"placementish", "preferred",
@ -1869,7 +1867,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
QueryRunnerTestHelper.orderedMap(
"doesn't exist", null,
@ -1903,7 +1901,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
QueryRunnerTestHelper.orderedMap(
"doesn't exist", null,
@ -1937,7 +1935,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
QueryRunnerTestHelper.orderedMap(
"doesn't exist", null,
@ -1970,7 +1968,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
@ -2015,7 +2013,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot"
@ -2050,7 +2048,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -2090,7 +2088,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -2130,7 +2128,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -2170,7 +2168,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "total_market",
@ -2217,7 +2215,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.of(
"addRowsIndexConstant", 504542.5071372986D,
@ -2264,7 +2262,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.of(
"addRowsIndexConstant", 504542.5071372986D,
@ -2309,7 +2307,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.QUALITY_DIMENSION, "e",
@ -2374,7 +2372,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "s",
@ -2425,7 +2423,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "s"
@ -2476,7 +2474,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "2spot0",
@ -2540,7 +2538,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "2spot0",
@ -2605,7 +2603,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "2spot0",
@ -2672,7 +2670,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot0",
@ -2738,7 +2736,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "1upfront",
@ -2804,7 +2802,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "1upfront",
@ -2871,7 +2869,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "1upfront",
@ -2924,7 +2922,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "s",
@ -2977,7 +2975,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "t",
@ -3030,7 +3028,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "t",
@ -3101,7 +3099,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "t",
@ -3148,7 +3146,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "t",
@ -3194,7 +3192,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "o",
@ -3268,7 +3266,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
@ -3356,7 +3354,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
@ -3405,7 +3403,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
@ -3459,7 +3457,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
withDuplicateResults(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
@ -3516,7 +3514,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
withDuplicateResults(
Collections.singletonList(
ImmutableMap.of(
@ -3564,7 +3562,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")
@ -3642,7 +3640,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
)
.context(ImmutableMap.of(QueryContexts.FINALIZE_KEY, true, QueryContexts.BY_SEGMENT_KEY, true))
.build();
TopNResultValue topNResult = new TopNResultValue(
TopNResultValue topNResult = TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")
@ -3714,7 +3712,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"market", "spot",
@ -3776,7 +3774,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"dayOfWeek", "Wednesday",
@ -3832,7 +3830,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
map
)
@ -3880,7 +3878,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
map
)
@ -3911,7 +3909,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
map,
ImmutableMap.<String, Object>of(
@ -3949,7 +3947,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
map
)
@ -3976,7 +3974,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
ImmutableMap.<String, Object>of(
"partial_null_column", "value",
@ -4009,7 +4007,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
withDuplicateResults(
Arrays.asList(
ImmutableMap.of(
@ -4048,7 +4046,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
withDuplicateResults(
Arrays.asList(
ImmutableMap.of(
@ -4099,7 +4097,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.of(
QueryRunnerTestHelper.MARKET_DIMENSION, "spot",
@ -4170,7 +4168,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
map
)
@ -4237,7 +4235,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
map
)
@ -4277,7 +4275,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", 1000.0f)
@ -4352,7 +4350,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", "super-1000")
@ -4424,7 +4422,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("qf_alias", "14000.0")
@ -4496,7 +4494,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1400L)
@ -4569,7 +4567,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1400L)
@ -4631,7 +4629,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("quality", "mezzanine")
@ -4681,7 +4679,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
"vc", "spot spot",
@ -4741,7 +4739,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("ql_alias", "super-1400")
@ -4813,7 +4811,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("ql_alias", "1400")
@ -4885,7 +4883,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("qns_alias", 140000L)
@ -4957,7 +4955,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("qns_alias", 140000.0f)
@ -5029,7 +5027,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("time_alias", 1296345600000L)
@ -5089,7 +5087,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", 59L)
@ -5125,7 +5123,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("__time_alias", DateTimes.of("2011-01-12T00:00:00.000Z").getMillis())
@ -5164,7 +5162,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(Collections.singletonList(nullAliasMap))
TopNResultValue.create(Collections.singletonList(nullAliasMap))
)
);
assertExpectedResults(expectedResults, query);
@ -5185,7 +5183,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", 59.021022d)
@ -5236,7 +5234,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("time_alias", "super-1296345600000")
@ -5324,7 +5322,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
expectedMap
)
@ -5363,7 +5361,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("alias", 9L)
@ -5441,7 +5439,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("qns_alias", 140000L)
@ -5510,7 +5508,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("ql_alias", 1400L)
@ -5637,7 +5635,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(rows)
TopNResultValue.create(rows)
)
);
assertExpectedResults(expectedResults, query);
@ -5672,7 +5670,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(Collections.emptyList())
TopNResultValue.create(Collections.emptyList())
)
);
assertExpectedResults(expectedResults, query);
@ -5724,7 +5722,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", 97L)
@ -5792,7 +5790,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", 97L)
@ -5860,7 +5858,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-02T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.<String, Object>builder()
.put("index_alias", 97L)
@ -5929,7 +5927,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
makeRowWithNulls("dim", NullHandling.defaultLongValue(), "count", 279L),
makeRowWithNulls("dim", 10L, "count", 93L),
@ -5961,7 +5959,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
makeRowWithNulls("dim", NullHandling.defaultDoubleValue(), "count", 279L),
makeRowWithNulls("dim", 10.0, "count", 93L),
@ -5993,7 +5991,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
makeRowWithNulls("dim", NullHandling.defaultFloatValue(), "count", 279L),
makeRowWithNulls("dim", 10.0f, "count", 93L),
@ -6095,7 +6093,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "spot")
@ -6168,7 +6166,7 @@ public class TopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "spot")

View File

@ -43,6 +43,6 @@ public class TopNQueryRunnerTestHelper
}
expected.add(theVals);
}
return new Result<TopNResultValue>(DateTimes.of(date), new TopNResultValue(expected));
return new Result<TopNResultValue>(DateTimes.of(date), TopNResultValue.create(expected));
}
}

View File

@ -22,14 +22,12 @@ package org.apache.druid.query.topn;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.collections.CloseableStupidPool;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.Result;
import org.apache.druid.query.TestQueryRunners;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@ -40,7 +38,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -60,30 +57,7 @@ public class TopNUnionQueryTest extends InitializedNullHandlingTest
@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder()
{
final CloseableStupidPool<ByteBuffer> defaultPool = TestQueryRunners.createDefaultNonBlockingPool();
final CloseableStupidPool<ByteBuffer> customPool = new CloseableStupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(2000)
);
return QueryRunnerTestHelper.cartesian(
Iterables.concat(
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
defaultPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
),
QueryRunnerTestHelper.makeUnionQueryRunners(
new TopNQueryRunnerFactory(
customPool,
new TopNQueryQueryToolChest(new TopNQueryConfig()),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
)
);
return QueryRunnerTestHelper.transformToConstructionFeeder(TopNQueryRunnerTest.queryRunners());
}
private final QueryRunner runner;
@ -126,7 +100,7 @@ public class TopNUnionQueryTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put(QueryRunnerTestHelper.MARKET_DIMENSION, "total_market")

View File

@ -167,7 +167,7 @@ public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = ImmutableList.of(
new Result<>(
DateTimes.of("2020-04-02T00:00:00.000Z"),
new TopNResultValue(ImmutableList.of())
TopNResultValue.create(ImmutableList.of())
)
);
assertExpectedResultsWithCustomRunner(expectedResults, query, queryRunner);
@ -208,7 +208,7 @@ public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "a",
@ -287,7 +287,7 @@ public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "preferred",
@ -372,7 +372,7 @@ public class UnnestTopNQueryRunnerTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-04-01T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.of(
QueryRunnerTestHelper.PLACEMENTISH_DIMENSION_UNNEST, "spot",

View File

@ -155,7 +155,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -183,7 +183,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -290,7 +290,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "")
@ -318,7 +318,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "")
@ -427,7 +427,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -455,7 +455,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -559,7 +559,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -587,7 +587,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -677,7 +677,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -774,7 +774,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -802,7 +802,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -896,7 +896,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.singletonList(
QueryRunnerTestHelper.orderedMap(
"market", null,
@ -914,7 +914,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<SearchHit>emptyList()
)
)
@ -981,7 +981,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -1078,7 +1078,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
QueryRunnerTestHelper.orderedMap(
"market", null,
@ -1106,7 +1106,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -1209,7 +1209,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Arrays.asList(
new Result<TopNResultValue>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "")
@ -1246,7 +1246,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")
@ -1283,7 +1283,7 @@ public class SchemalessTestFullTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedFilteredTopNResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.<Map<String, Object>>asList(
ImmutableMap.<String, Object>builder()
.put("market", "spot")

View File

@ -196,7 +196,7 @@ public class SchemalessTestSimpleTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Arrays.asList(
new DimensionAndMetricValueExtractor(
ImmutableMap.<String, Object>builder()

View File

@ -261,7 +261,7 @@ public class TestHelper
final Object next = resultsIter.next();
final Object next2 = resultsIter2.next();
String failMsg = msg + "-" + index++;
String failMsg = msg + "[" + index++ + "]";
String failMsg2 = StringUtils.format(
"%s: Second iterator bad, multiple calls to iterator() should be safe",
failMsg
@ -366,9 +366,14 @@ public class TestHelper
final Map<String, Object> expectedMap = ((MapBasedRow) expected).getEvent();
final Map<String, Object> actualMap = ((MapBasedRow) actual).getEvent();
Assert.assertEquals(StringUtils.format("%s: map keys", msg), expectedMap.keySet(), actualMap.keySet());
for (final String key : expectedMap.keySet()) {
final Object expectedValue = expectedMap.get(key);
if (!actualMap.containsKey(key)) {
Assert.fail(
StringUtils.format("%s: Expected key [%s] to exist, but it did not [%s]", msg, key, actualMap.keySet())
);
}
final Object actualValue = actualMap.get(key);
if (expectedValue != null && expectedValue.getClass().isArray()) {
@ -388,6 +393,9 @@ public class TestHelper
);
}
}
// Given that we iterated through all of the keys in one, checking that the key exists in the other, then if they
// have the same size, they must have the same keyset.
Assert.assertEquals(expectedMap.size(), actualMap.size());
}
public static void assertRow(String msg, ResultRow expected, ResultRow actual)

View File

@ -372,7 +372,7 @@ public class DummyStringVirtualColumnTest extends InitializedNullHandlingTest
List<Result<TopNResultValue>> expectedRows = Collections.singletonList(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
TopNResultValue.create(
Collections.<Map<String, Object>>singletonList(
ImmutableMap.<String, Object>builder()
.put(COUNT, 1674L)

View File

@ -37,7 +37,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.FrameBasedInlineDataSource;
import org.apache.druid.query.FrameSignaturePair;
import org.apache.druid.query.GlobalTableDataSource;
@ -489,9 +489,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new SetAndVerifyContextQueryRunner<>(
final SetAndVerifyContextQueryRunner<T> baseRunner = new SetAndVerifyContextQueryRunner<>(
serverConfig,
new RetryQueryRunner<>(
baseClusterRunner,
@ -499,8 +497,10 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
retryConfig,
objectMapper
)
)
)
);
return FluentQueryRunner
.create(baseRunner, toolChest)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()

View File

@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -112,8 +112,8 @@ public class LocalQuerySegmentWalker implements QuerySegmentWalker
// Note: Not calling 'postProcess'; it isn't official/documented functionality so we'll only support it where
// it is already supported.
return new FluentQueryRunnerBuilder<>(queryRunnerFactory.getToolchest())
.create(scheduler.wrapQueryRunner(baseRunner))
return FluentQueryRunner
.create(scheduler.wrapQueryRunner(baseRunner), queryRunnerFactory.getToolchest())
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()

View File

@ -71,6 +71,7 @@ import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
@ -774,15 +775,12 @@ public class CachingClusteredClientTest
.threshold(3)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.granularity(Granularities.HOUR)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
);
QueryRunner runner = makeTopNQueryRunner();
testQueryCaching(
runner,
@ -853,10 +851,7 @@ public class CachingClusteredClientTest
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
);
QueryRunner runner = makeTopNQueryRunner();
testQueryCaching(
runner,
@ -878,7 +873,6 @@ public class CachingClusteredClientTest
.build();
TestHelper.assertExpectedResults(
makeRenamedTopNResults(
new DateTime("2011-11-04", TIMEZONE), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992,
new DateTime("2011-11-05", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
new DateTime("2011-11-06", TIMEZONE), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989,
@ -952,15 +946,13 @@ public class CachingClusteredClientTest
.threshold(3)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.granularity(Granularities.HOUR)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
);
QueryRunner runner = makeTopNQueryRunner();
testQueryCaching(
runner,
builder.randomQueryId().build(),
@ -1023,15 +1015,12 @@ public class CachingClusteredClientTest
.threshold(3)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.granularity(Granularities.HOUR)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(CONTEXT);
QueryRunner runner = new FinalizeResultsQueryRunner(
getDefaultQueryRunner(),
new TopNQueryQueryToolChest(new TopNQueryConfig())
);
QueryRunner runner = makeTopNQueryRunner();
testQueryCaching(
runner,
@ -1085,6 +1074,16 @@ public class CachingClusteredClientTest
);
}
@SuppressWarnings({"rawtypes", "unchecked"})
private FluentQueryRunner makeTopNQueryRunner()
{
return FluentQueryRunner
.create(getDefaultQueryRunner(), new TopNQueryQueryToolChest(new TopNQueryConfig()))
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration();
}
@Test
public void testSearchCaching()
{
@ -2283,7 +2282,8 @@ public class CachingClusteredClientTest
)
),
initializeResponseContext()
)
).toList(),
StringUtils.format("Run number [%d]", i)
);
if (queryCompletedCallback != null) {
queryCompletedCallback.run();
@ -2664,7 +2664,7 @@ public class CachingClusteredClientTest
index += 3;
}
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
retVal.add(new Result<>(timestamp, TopNResultValue.create(values)));
}
return retVal;
}

View File

@ -487,7 +487,7 @@ public class CachingQueryRunnerTest
index += 3;
}
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
retVal.add(new Result<>(timestamp, TopNResultValue.create(values)));
}
return retVal;
}

View File

@ -51,6 +51,7 @@ import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
@ -277,7 +278,7 @@ public class BackgroundCachePopulatorTest
index += 3;
}
retVal.add(new Result<>(timestamp, new TopNResultValue(values)));
retVal.add(new Result<>(timestamp, TopNResultValue.create(values)));
}
return retVal;
}

View File

@ -47,7 +47,7 @@ import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.FluentQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryPlus;
@ -769,6 +769,7 @@ public class QuerySchedulerTest
});
}
@SuppressWarnings({"rawtypes", "unchecked"})
private ListenableFuture<?> makeMergingQueryFuture(
ListeningExecutorService executorService,
QueryScheduler scheduler,
@ -783,14 +784,20 @@ public class QuerySchedulerTest
Assert.assertNotNull(scheduled);
FluentQueryRunnerBuilder fluentQueryRunnerBuilder = new FluentQueryRunnerBuilder(toolChest);
FluentQueryRunnerBuilder.FluentQueryRunner runner = fluentQueryRunnerBuilder.create((queryPlus, responseContext) -> {
FluentQueryRunner runner = FluentQueryRunner
.create(
(queryPlus, responseContext) -> {
Sequence<Integer> underlyingSequence = makeSequence(numRows);
Sequence<Integer> results = scheduler.run(scheduled, underlyingSequence);
return results;
});
return (Sequence) results;
},
toolChest
)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration();
final int actualNumRows = consumeAndCloseSequence(runner.mergeResults().run(QueryPlus.wrap(query)));
final int actualNumRows = consumeAndCloseSequence(runner.run(QueryPlus.wrap(query)));
Assert.assertEquals(actualNumRows, numRows);
}
catch (IOException ex) {