add CachingClusteredClient benchmark, refactor some stuff (#8089)

* add CachingClusteredClient benchmark, refactor some stuff

* revert WeightedServerSelectorStrategy to ConnectionCountServerSelectorStrategy and remove getWeight since felt artificial, default mergeResults in toolchest implementation for topn, search, select

* adjust javadoc

* adjustments

* oops

* use it

* use BinaryOperator, remove CombiningFunction, use Comparator instead of Ordering, other review adjustments

* rename createComparator to createResultComparator, fix typo, firstNonNull nullable parameters
This commit is contained in:
Clint Wylie 2019-07-18 13:16:28 -07:00 committed by GitHub
parent 72496d3712
commit 03e55d30eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
59 changed files with 1171 additions and 1798 deletions

View File

@ -0,0 +1,568 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.query;
import com.fasterxml.jackson.databind.InjectableValues.Std;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Ordering;
import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.data.input.Row;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.query.topn.TopNQuery;
import org.apache.druid.query.topn.TopNQueryBuilder;
import org.apache.druid.query.topn.TopNQueryConfig;
import org.apache.druid.query.topn.TopNQueryQueryToolChest;
import org.apache.druid.query.topn.TopNQueryRunnerFactory;
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.TimelineLookup;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
@Warmup(iterations = 15)
@Measurement(iterations = 30)
public class CachingClusteredClientBenchmark
{
private static final Logger LOG = new Logger(CachingClusteredClientBenchmark.class);
private static final int PROCESSING_BUFFER_SIZE = 10 * 1024 * 1024; // ~10MB
private static final String DATA_SOURCE = "ds";
public static final ObjectMapper JSON_MAPPER;
@Param({"8"})
private int numServers;
@Param({"4", "2", "1"})
private int numProcessingThreads;
@Param({"75000"})
private int rowsPerSegment;
@Param({"all"})
private String queryGranularity;
private QueryToolChestWarehouse toolChestWarehouse;
private QueryRunnerFactoryConglomerate conglomerate;
private CachingClusteredClient cachingClusteredClient;
private ExecutorService processingPool;
private Query query;
private final Closer closer = Closer.create();
private final BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
private final QuerySegmentSpec basicSchemaIntervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
);
static {
JSON_MAPPER = new DefaultObjectMapper();
JSON_MAPPER.setInjectableValues(
new Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), JSON_MAPPER)
.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT)
);
}
@Setup(Level.Trial)
public void setup()
{
final String schemaName = "basic";
BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
Map<DataSegment, QueryableIndex> queryableIndexes = new HashMap<>(numServers);
for (int i = 0; i < numServers; i++) {
final DataSegment dataSegment = DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(schemaInfo.getDataInterval())
.version("1")
.shardSpec(new LinearShardSpec(i))
.build();
final SegmentGenerator segmentGenerator = closer.register(new SegmentGenerator());
LOG.info("Starting benchmark setup using cacheDir[%s], rows[%,d].", segmentGenerator.getCacheDir(), rowsPerSegment);
final QueryableIndex index = segmentGenerator.generate(dataSegment, schemaInfo, Granularities.NONE, rowsPerSegment);
queryableIndexes.put(dataSegment, index);
}
final DruidProcessingConfig processingConfig = new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
@Override
public int intermediateComputeSizeBytes()
{
return PROCESSING_BUFFER_SIZE;
}
@Override
public int getNumMergeBuffers()
{
return 1;
}
@Override
public int getNumThreads()
{
return numProcessingThreads;
}
};
conglomerate = new DefaultQueryRunnerFactoryConglomerate(
ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
.put(
TimeseriesQuery.class,
new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
TopNQuery.class,
new TopNQueryRunnerFactory(
new StupidPool<>(
"TopNQueryRunnerFactory-bufferPool",
() -> ByteBuffer.allocate(PROCESSING_BUFFER_SIZE)
),
new TopNQueryQueryToolChest(
new TopNQueryConfig(),
QueryRunnerTestHelper.noopIntervalChunkingQueryRunnerDecorator()
),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
)
.put(
GroupByQuery.class,
makeGroupByQueryRunnerFactory(
GroupByQueryRunnerTest.DEFAULT_MAPPER,
new GroupByQueryConfig()
{
@Override
public String getDefaultStrategy()
{
return GroupByStrategySelector.STRATEGY_V2;
}
},
processingConfig
)
)
.build()
);
toolChestWarehouse = new QueryToolChestWarehouse()
{
@Override
public <T, QueryType extends Query<T>> QueryToolChest<T, QueryType> getToolChest(final QueryType query)
{
return conglomerate.findFactory(query).getToolchest();
}
};
SimpleServerView serverView = new SimpleServerView();
int serverSuffx = 1;
for (Entry<DataSegment, QueryableIndex> entry : queryableIndexes.entrySet()) {
serverView.addServer(
createServer(serverSuffx++),
entry.getKey(),
entry.getValue()
);
}
processingPool = Execs.multiThreaded(processingConfig.getNumThreads(), "caching-clustered-client-benchmark");
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
serverView,
MapCache.create(0),
JSON_MAPPER,
new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig()
);
}
private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
final ObjectMapper mapper,
final GroupByQueryConfig config,
final DruidProcessingConfig processingConfig
)
{
final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final Supplier<ByteBuffer> bufferSupplier =
() -> ByteBuffer.allocateDirect(processingConfig.intermediateComputeSizeBytes());
final NonBlockingPool<ByteBuffer> bufferPool = new StupidPool<>(
"GroupByQueryEngine-bufferPool",
bufferSupplier
);
final BlockingPool<ByteBuffer> mergeBufferPool = new DefaultBlockingPool<>(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByStrategySelector strategySelector = new GroupByStrategySelector(
configSupplier,
new GroupByStrategyV1(
configSupplier,
new GroupByQueryEngine(configSupplier, bufferPool),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
bufferPool
),
new GroupByStrategyV2(
processingConfig,
configSupplier,
bufferPool,
mergeBufferPool,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(
strategySelector,
QueryRunnerTestHelper.sameThreadIntervalChunkingQueryRunnerDecorator()
);
return new GroupByQueryRunnerFactory(
strategySelector,
toolChest
);
}
@TearDown(Level.Trial)
public void tearDown() throws IOException
{
closer.close();
processingPool.shutdown();
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void timeseriesQuery(Blackhole blackhole)
{
query = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.build();
final List<Result<TimeseriesResultValue>> results = runQuery();
for (Result<TimeseriesResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void topNQuery(Blackhole blackhole)
{
query = new TopNQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.dimension(new DefaultDimensionSpec("dimUniform", null))
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.metric("sumLongSequential")
.threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results
.build();
final List<Result<TopNResultValue>> results = runQuery();
for (Result<TopNResultValue> result : results) {
blackhole.consume(result);
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void groupByQuery(Blackhole blackhole)
{
query = GroupByQuery
.builder()
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(basicSchemaIntervalSpec)
.setDimensions(
new DefaultDimensionSpec("dimUniform", null),
new DefaultDimensionSpec("dimZipf", null)
)
.setAggregatorSpecs(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.setGranularity(Granularity.fromString(queryGranularity))
.build();
final List<Row> results = runQuery();
for (Row result : results) {
blackhole.consume(result);
}
}
private <T> List<T> runQuery()
{
//noinspection unchecked
QueryRunner<T> theRunner = new FluentQueryRunnerBuilder<>(toolChestWarehouse.getToolChest(query))
.create(cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals()))
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration();
//noinspection unchecked
Sequence<T> queryResult = theRunner.run(QueryPlus.wrap(query), new HashMap<>());
return queryResult.toList();
}
private class SimpleServerView implements TimelineServerView
{
private final TierSelectorStrategy tierSelectorStrategy = new HighestPriorityTierSelectorStrategy(
new RandomServerSelectorStrategy()
);
// server -> queryRunner
private final Map<DruidServer, SingleSegmentDruidServer> servers = new HashMap<>();
// segmentId -> serverSelector
private final Map<String, ServerSelector> selectors = new HashMap<>();
// dataSource -> version -> serverSelector
private final Map<String, VersionedIntervalTimeline<String, ServerSelector>> timelines = new HashMap<>();
void addServer(DruidServer server, DataSegment dataSegment, QueryableIndex queryableIndex)
{
servers.put(
server,
new SingleSegmentDruidServer(
server,
new SimpleQueryRunner(
conglomerate,
dataSegment.getId(),
queryableIndex
)
)
);
addSegmentToServer(server, dataSegment);
}
void addSegmentToServer(DruidServer server, DataSegment segment)
{
final ServerSelector selector = selectors.computeIfAbsent(
segment.getId().toString(),
k -> new ServerSelector(segment, tierSelectorStrategy)
);
selector.addServerAndUpdateSegment(servers.get(server), segment);
timelines.computeIfAbsent(segment.getDataSource(), k -> new VersionedIntervalTimeline<>(Ordering.natural()))
.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(selector));
}
@Nullable
@Override
public TimelineLookup<String, ServerSelector> getTimeline(DataSource dataSource)
{
final String table = Iterables.getOnlyElement(dataSource.getNames());
return timelines.get(table);
}
@Override
public List<ImmutableDruidServer> getDruidServers()
{
return Collections.emptyList();
}
@Override
public <T> QueryRunner<T> getQueryRunner(DruidServer server)
{
final SingleSegmentDruidServer queryableDruidServer = Preconditions.checkNotNull(servers.get(server), "server");
return (QueryRunner<T>) queryableDruidServer.getQueryRunner();
}
@Override
public void registerTimelineCallback(Executor exec, TimelineCallback callback)
{
// do nothing
}
@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
{
// do nothing
}
@Override
public void registerSegmentCallback(Executor exec, SegmentCallback callback)
{
// do nothing
}
}
private class SimpleQueryRunner implements QueryRunner<Object>
{
private final QueryRunnerFactoryConglomerate conglomerate;
private final QueryableIndexSegment segment;
public SimpleQueryRunner(QueryRunnerFactoryConglomerate conglomerate, SegmentId segmentId, QueryableIndex queryableIndex)
{
this.conglomerate = conglomerate;
this.segment = new QueryableIndexSegment(queryableIndex, segmentId);
}
@Override
public Sequence<Object> run(QueryPlus<Object> queryPlus, Map<String, Object> responseContext)
{
final QueryRunnerFactory factory = conglomerate.findFactory(queryPlus.getQuery());
//noinspection unchecked
return factory.getToolchest().preMergeQueryDecoration(
new FinalizeResultsQueryRunner<>(
new BySegmentQueryRunner<>(
segment.getId(),
segment.getDataInterval().getStart(),
factory.createRunner(segment)
),
factory.getToolchest()
)
).run(queryPlus, responseContext);
}
}
private class SingleSegmentDruidServer extends QueryableDruidServer<SimpleQueryRunner>
{
SingleSegmentDruidServer(DruidServer server, SimpleQueryRunner runner)
{
super(server, runner);
}
}
private static DruidServer createServer(int nameSuiffix)
{
return new DruidServer(
"server_" + nameSuiffix,
"127.0.0." + nameSuiffix,
null,
Long.MAX_VALUE,
ServerType.HISTORICAL,
"default",
0
);
}
}

View File

@ -35,6 +35,7 @@ java.lang.Math#random() @ Use ThreadLocalRandom.current()
java.util.regex.Pattern#matches(java.lang.String,java.lang.CharSequence) @ Use String.startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly
org.apache.commons.io.FileUtils#getTempDirectory() @ Use org.junit.rules.TemporaryFolder for tests instead
java.lang.Class#getCanonicalName() @ Class.getCanonicalName can return null for anonymous types, use Class.getName instead.
com.google.common.base.Objects#firstNonNull(java.lang.Object, java.lang.Object) @ Use org.apache.druid.common.guava.GuavaUtils#firstNonNull(java.lang.Object, java.lang.Object) instead (probably... the GuavaUtils method return object is nullable)
@defaultMessage Use Locale.ENGLISH
com.ibm.icu.text.DateFormatSymbols#<init>()

View File

@ -19,15 +19,16 @@
package org.apache.druid.collections;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.guava.MergeIterable;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import java.util.Comparator;
import java.util.Iterator;
import java.util.function.BinaryOperator;
/**
*/
public class CombiningIterable<InType> implements Iterable<InType>
public class CombiningIterable<T> implements Iterable<T>
{
/**
* Creates a CombiningIterable around a MergeIterable such that equivalent elements are thrown away
@ -38,50 +39,40 @@ public class CombiningIterable<InType> implements Iterable<InType>
*
* @param in An Iterable of Iterables to be merged
* @param comparator the Comparator to determine sort and equality
* @param <InType> Type of object
* @param <T> Type of object
* @return An Iterable that is the merge of all Iterables from in such that there is only one instance of
* equivalent objects.
*/
@SuppressWarnings("unchecked")
public static <InType> CombiningIterable<InType> createSplatted(
Iterable<? extends Iterable<InType>> in,
Comparator<InType> comparator
public static <T> CombiningIterable<T> createSplatted(
Iterable<? extends Iterable<T>> in,
Comparator<T> comparator
)
{
return create(
new MergeIterable<InType>(comparator, (Iterable<Iterable<InType>>) in),
new MergeIterable<>(comparator, (Iterable<Iterable<T>>) in),
comparator,
new BinaryFn<InType, InType, InType>()
{
@Override
public InType apply(InType arg1, InType arg2)
{
if (arg1 == null) {
return arg2;
}
return arg1;
}
}
GuavaUtils::firstNonNull
);
}
public static <InType> CombiningIterable<InType> create(
Iterable<InType> it,
Comparator<InType> comparator,
BinaryFn<InType, InType, InType> fn
public static <T> CombiningIterable<T> create(
Iterable<T> it,
Comparator<T> comparator,
BinaryOperator<T> fn
)
{
return new CombiningIterable<InType>(it, comparator, fn);
return new CombiningIterable<>(it, comparator, fn);
}
private final Iterable<InType> it;
private final Comparator<InType> comparator;
private final BinaryFn<InType, InType, InType> fn;
private final Iterable<T> it;
private final Comparator<T> comparator;
private final BinaryOperator<T> fn;
public CombiningIterable(
Iterable<InType> it,
Comparator<InType> comparator,
BinaryFn<InType, InType, InType> fn
Iterable<T> it,
Comparator<T> comparator,
BinaryOperator<T> fn
)
{
this.it = it;
@ -90,7 +81,7 @@ public class CombiningIterable<InType> implements Iterable<InType>
}
@Override
public Iterator<InType> iterator()
public Iterator<T> iterator()
{
return CombiningIterator.create(it.iterator(), comparator, fn);
}

View File

@ -21,33 +21,33 @@ package org.apache.druid.collections;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.BinaryOperator;
/**
*/
public class CombiningIterator<InType> implements Iterator<InType>
public class CombiningIterator<T> implements Iterator<T>
{
public static <InType> CombiningIterator<InType> create(
Iterator<InType> it,
Comparator<InType> comparator,
BinaryFn<InType, InType, InType> fn
public static <T> CombiningIterator<T> create(
Iterator<T> it,
Comparator<T> comparator,
BinaryOperator<T> fn
)
{
return new CombiningIterator<InType>(it, comparator, fn);
return new CombiningIterator<>(it, comparator, fn);
}
private final PeekingIterator<InType> it;
private final Comparator<InType> comparator;
private final BinaryFn<InType, InType, InType> fn;
private final PeekingIterator<T> it;
private final Comparator<T> comparator;
private final BinaryOperator<T> fn;
public CombiningIterator(
Iterator<InType> it,
Comparator<InType> comparator,
BinaryFn<InType, InType, InType> fn
Iterator<T> it,
Comparator<T> comparator,
BinaryOperator<T> fn
)
{
this.it = Iterators.peekingIterator(it);
@ -62,13 +62,13 @@ public class CombiningIterator<InType> implements Iterator<InType>
}
@Override
public InType next()
public T next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
InType res = null;
T res = null;
while (hasNext()) {
if (res == null) {

View File

@ -1,139 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.FunctionalIterator;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
/**
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
*
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
* be provided in the order of the first element of each iterator.
*
* If this doesn't make sense, check out OrderedMergeIteratorTest.testScrewsUpOnOutOfOrderBeginningOfList()
*
* It places this extra restriction on the input data in order to implement an optimization that allows it to
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
*/
public class OrderedMergeIterator<T> implements Iterator<T>
{
private final PriorityQueue<PeekingIterator<T>> firstElementComparedPQueue;
private PeekingIterator<PeekingIterator<T>> iterOfIterators;
private final Comparator<T> comparator;
public OrderedMergeIterator(
final Comparator<T> comparator,
Iterator<Iterator<T>> iterators
)
{
this.comparator = comparator;
firstElementComparedPQueue = new PriorityQueue<PeekingIterator<T>>(
16,
new Comparator<PeekingIterator<T>>()
{
@Override
public int compare(PeekingIterator<T> lhs, PeekingIterator<T> rhs)
{
return comparator.compare(lhs.peek(), rhs.peek());
}
}
);
iterOfIterators = Iterators.peekingIterator(
FunctionalIterator.create(iterators)
.filter(
new Predicate<Iterator<T>>()
{
@Override
public boolean apply(Iterator<T> input)
{
return input.hasNext();
}
}
)
.transform(
new Function<Iterator<T>, PeekingIterator<T>>()
{
@Override
public PeekingIterator<T> apply(Iterator<T> input)
{
return Iterators.peekingIterator(input);
}
}
)
);
}
@Override
public boolean hasNext()
{
return !firstElementComparedPQueue.isEmpty() || iterOfIterators.hasNext();
}
@Override
public T next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
final PeekingIterator<T> littleIt;
if (!iterOfIterators.hasNext()) {
littleIt = firstElementComparedPQueue.remove();
} else if (firstElementComparedPQueue.isEmpty()) {
littleIt = iterOfIterators.next();
} else {
T pQueueValue = firstElementComparedPQueue.peek().peek();
T iterItersValue = iterOfIterators.peek().peek();
if (comparator.compare(pQueueValue, iterItersValue) <= 0) {
littleIt = firstElementComparedPQueue.remove();
} else {
littleIt = iterOfIterators.next();
}
}
T retVal = littleIt.next();
if (littleIt.hasNext()) {
firstElementComparedPQueue.add(littleIt);
}
return retVal;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}

View File

@ -1,212 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.base.Function;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.YieldingAccumulators;
import org.apache.druid.java.util.common.io.Closer;
import java.io.IOException;
import java.util.PriorityQueue;
/**
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
*
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
* be provided in the order of the first element of each iterator.
*
* If this doesn't make sense, check out OrderedMergeSequenceTest.testScrewsUpOnOutOfOrderBeginningOfList()
*
* It places this extra restriction on the input data in order to implement an optimization that allows it to
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
*/
public class OrderedMergeSequence<T> implements Sequence<T>
{
private final Ordering<T> ordering;
private final Sequence<Sequence<T>> sequences;
public OrderedMergeSequence(
final Ordering<T> ordering,
Sequence<Sequence<T>> sequences
)
{
this.ordering = ordering;
this.sequences = sequences;
}
@Override
public <OutType> OutType accumulate(OutType initValue, Accumulator<OutType, T> accumulator)
{
Yielder<OutType> yielder = null;
try {
yielder = toYielder(initValue, YieldingAccumulators.fromAccumulator(accumulator));
return yielder.get();
}
finally {
CloseQuietly.close(yielder);
}
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
PriorityQueue<Yielder<T>> pQueue = new PriorityQueue<Yielder<T>>(
32,
ordering.onResultOf(
new Function<Yielder<T>, T>()
{
@Override
public T apply(Yielder<T> input)
{
return input.get();
}
}
)
);
Yielder<Yielder<T>> oldDudeAtCrosswalk = sequences.toYielder(
null,
new YieldingAccumulator<Yielder<T>, Sequence<T>>()
{
@Override
public Yielder<T> accumulate(Yielder<T> accumulated, Sequence<T> in)
{
final Yielder<T> retVal = in.toYielder(
null,
new YieldingAccumulator<T, T>()
{
@Override
public T accumulate(T accumulated, T in)
{
yield();
return in;
}
}
);
if (retVal.isDone()) {
try {
retVal.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
return null;
} else {
yield();
}
return retVal;
}
}
);
return makeYielder(pQueue, oldDudeAtCrosswalk, initValue, accumulator);
}
private <OutType> Yielder<OutType> makeYielder(
final PriorityQueue<Yielder<T>> pQueue,
Yielder<Yielder<T>> oldDudeAtCrosswalk,
OutType initVal,
final YieldingAccumulator<OutType, T> accumulator
)
{
OutType retVal = initVal;
while (!accumulator.yielded() && (!pQueue.isEmpty() || !oldDudeAtCrosswalk.isDone())) {
Yielder<T> yielder;
if (oldDudeAtCrosswalk.isDone()) {
yielder = pQueue.remove();
} else if (pQueue.isEmpty()) {
yielder = oldDudeAtCrosswalk.get();
oldDudeAtCrosswalk = oldDudeAtCrosswalk.next(null);
} else {
Yielder<T> queueYielder = pQueue.peek();
Yielder<T> iterYielder = oldDudeAtCrosswalk.get();
if (ordering.compare(queueYielder.get(), iterYielder.get()) <= 0) {
yielder = pQueue.remove();
} else {
yielder = oldDudeAtCrosswalk.get();
oldDudeAtCrosswalk = oldDudeAtCrosswalk.next(null);
}
}
retVal = accumulator.accumulate(retVal, yielder.get());
yielder = yielder.next(null);
if (yielder.isDone()) {
try {
yielder.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
} else {
pQueue.add(yielder);
}
}
if (!accumulator.yielded()) {
return Yielders.done(retVal, oldDudeAtCrosswalk);
}
final OutType yieldVal = retVal;
final Yielder<Yielder<T>> finalOldDudeAtCrosswalk = oldDudeAtCrosswalk;
return new Yielder<OutType>()
{
@Override
public OutType get()
{
return yieldVal;
}
@Override
public Yielder<OutType> next(OutType initValue)
{
accumulator.reset();
return makeYielder(pQueue, finalOldDudeAtCrosswalk, initValue, accumulator);
}
@Override
public boolean isDone()
{
return false;
}
@Override
public void close() throws IOException
{
Closer closer = Closer.create();
while (!pQueue.isEmpty()) {
closer.register(pQueue.remove());
}
closer.close();
}
};
}
}

View File

@ -19,15 +19,15 @@
package org.apache.druid.common.guava;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import java.io.IOException;
import java.util.Comparator;
import java.util.function.BinaryOperator;
/**
*/
@ -35,21 +35,21 @@ public class CombiningSequence<T> implements Sequence<T>
{
public static <T> CombiningSequence<T> create(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
Comparator<T> ordering,
BinaryOperator<T> mergeFn
)
{
return new CombiningSequence<>(baseSequence, ordering, mergeFn);
}
private final Sequence<T> baseSequence;
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;
private final Comparator<T> ordering;
private final BinaryOperator<T> mergeFn;
private CombiningSequence(
Sequence<T> baseSequence,
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn
Comparator<T> ordering,
BinaryOperator<T> mergeFn
)
{
this.baseSequence = baseSequence;
@ -147,8 +147,8 @@ public class CombiningSequence<T> implements Sequence<T>
private static class CombiningYieldingAccumulator<OutType, T> extends YieldingAccumulator<T, T>
{
private final Ordering<T> ordering;
private final BinaryFn<T, T, T> mergeFn;
private final Comparator<T> ordering;
private final BinaryOperator<T> mergeFn;
private final YieldingAccumulator<OutType, T> accumulator;
private OutType retVal;
@ -156,8 +156,8 @@ public class CombiningSequence<T> implements Sequence<T>
private boolean accumulatedSomething = false;
CombiningYieldingAccumulator(
Ordering<T> ordering,
BinaryFn<T, T, T> mergeFn,
Comparator<T> ordering,
BinaryOperator<T> mergeFn,
YieldingAccumulator<OutType, T> accumulator
)
{

View File

@ -31,7 +31,8 @@ public class GuavaUtils
{
/**
* To fix semantic difference of Longs.tryParse() from Long.parseLong (Longs.tryParse() returns null for '+' started value)
* To fix semantic difference of Longs.tryParse() from Long.parseLong (Longs.tryParse() returns null for '+' started
* value)
*/
@Nullable
public static Long tryParseLong(@Nullable String string)
@ -62,4 +63,18 @@ public class GuavaUtils
return null;
}
/**
* If first argument is not null, return it, else return the other argument. Sort of like
* {@link com.google.common.base.Objects#firstNonNull(Object, Object)} except will not explode if both arguments are
* null.
*/
@Nullable
public static <T> T firstNonNull(@Nullable T arg1, @Nullable T arg2)
{
if (arg1 == null) {
return arg2;
}
return arg1;
}
}

View File

@ -26,23 +26,22 @@ import java.util.Iterator;
*/
public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T>
{
private final IteratorMaker<T, IterType> maker;
public BaseSequence(
IteratorMaker<T, IterType> maker
)
public BaseSequence(IteratorMaker<T, IterType> maker)
{
this.maker = maker;
}
@Override
public <OutType> OutType accumulate(OutType initValue, final Accumulator<OutType, T> fn)
public <OutType> OutType accumulate(final OutType initValue, final Accumulator<OutType, T> fn)
{
IterType iterator = maker.make();
OutType accumulated = initValue;
try {
while (iterator.hasNext()) {
initValue = fn.accumulate(initValue, iterator.next());
accumulated = fn.accumulate(accumulated, iterator.next());
}
}
catch (Throwable t) {
@ -55,11 +54,14 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
throw t;
}
maker.cleanup(iterator);
return initValue;
return accumulated;
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
public <OutType> Yielder<OutType> toYielder(
final OutType initValue,
final YieldingAccumulator<OutType, T> accumulator
)
{
final IterType iterator = maker.make();
@ -78,7 +80,7 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
}
private <OutType> Yielder<OutType> makeYielder(
OutType initValue,
final OutType initValue,
final YieldingAccumulator<OutType, T> accumulator,
final IterType iter
)
@ -91,14 +93,7 @@ public class BaseSequence<T, IterType extends Iterator<T>> implements Sequence<T
if (!accumulator.yielded()) {
return Yielders.done(
retVal,
new Closeable()
{
@Override
public void close()
{
maker.cleanup(iter);
}
}
(Closeable) () -> maker.cleanup(iter)
);
}

View File

@ -1,27 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava.nary;
/**
*/
public interface BinaryFn<Type1, Type2, OutType>
{
OutType apply(Type1 arg1, Type2 arg2);
}

View File

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava.nary;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import java.util.Comparator;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* A SortedMergeIterator is an Iterator that combines two other Iterators into one.
*
* It assumes that the two Iterators are in sorted order and walks through them, passing their values to the
* BinaryFn in sorted order. If a value appears in one Iterator and not in the other, e.g. if the lhs has a value "1"
* and the rhs does not, the BinaryFn will be called with "1" for first argument and null for the second argument.
* Thus, the BinaryFn implementation *must* be aware of nulls.
*
*/
public class SortedMergeIterator<InType, OutType> implements Iterator<OutType>
{
public static <InType, OutType> SortedMergeIterator<InType, OutType> create(
Iterator<InType> lhs,
Iterator<InType> rhs,
Comparator<InType> comparator,
BinaryFn<InType, InType, OutType> fn
)
{
return new SortedMergeIterator<>(lhs, rhs, comparator, fn);
}
private final PeekingIterator<InType> lhs;
private final PeekingIterator<InType> rhs;
private final Comparator<InType> comparator;
private final BinaryFn<InType, InType, OutType> fn;
public SortedMergeIterator(
Iterator<InType> lhs,
Iterator<InType> rhs,
Comparator<InType> comparator,
BinaryFn<InType, InType, OutType> fn
)
{
this.lhs = Iterators.peekingIterator(lhs);
this.rhs = Iterators.peekingIterator(rhs);
this.comparator = comparator;
this.fn = fn;
}
@Override
public boolean hasNext()
{
return lhs.hasNext() || rhs.hasNext();
}
@Override
public OutType next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
if (!lhs.hasNext()) {
return fn.apply(null, rhs.next());
}
if (!rhs.hasNext()) {
return fn.apply(lhs.next(), null);
}
int compared = comparator.compare(lhs.peek(), rhs.peek());
if (compared < 0) {
return fn.apply(lhs.next(), null);
}
if (compared == 0) {
return fn.apply(lhs.next(), rhs.next());
}
return fn.apply(null, rhs.next());
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.collections;
import com.google.common.collect.PeekingIterator;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
@ -29,12 +28,13 @@ import org.junit.Test;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.function.BinaryOperator;
public class CombiningIteratorTest
{
private CombiningIterator<String> testingIterator;
private Comparator<String> comparator;
private BinaryFn binaryFn;
private BinaryOperator<String> combiningFunction;
private PeekingIterator<String> peekIterator;
@Before
@ -42,8 +42,8 @@ public class CombiningIteratorTest
{
peekIterator = EasyMock.createMock(PeekingIterator.class);
comparator = EasyMock.createMock(Comparator.class);
binaryFn = EasyMock.createMock(BinaryFn.class);
testingIterator = CombiningIterator.create(peekIterator, comparator, binaryFn);
combiningFunction = EasyMock.createMock(BinaryOperator.class);
testingIterator = CombiningIterator.create(peekIterator, comparator, combiningFunction);
}
@After
@ -84,19 +84,19 @@ public class CombiningIteratorTest
String defaultString = "S1";
String resString = "S2";
EasyMock.expect(peekIterator.next()).andReturn(defaultString);
EasyMock.expect(binaryFn.apply(EasyMock.eq(defaultString), EasyMock.isNull()))
EasyMock.expect(combiningFunction.apply(EasyMock.eq(defaultString), EasyMock.isNull()))
.andReturn(resString);
EasyMock.expect(peekIterator.next()).andReturn(defaultString);
EasyMock.expect(comparator.compare(EasyMock.eq(resString), EasyMock.eq(defaultString)))
.andReturn(0);
EasyMock.expect(peekIterator.next()).andReturn(defaultString);
EasyMock.expect(binaryFn.apply(EasyMock.eq(resString), EasyMock.eq(defaultString)))
EasyMock.expect(combiningFunction.apply(EasyMock.eq(resString), EasyMock.eq(defaultString)))
.andReturn(resString);
EasyMock.expect(comparator.compare(EasyMock.eq(resString), EasyMock.eq(defaultString)))
.andReturn(1);
EasyMock.replay(peekIterator);
EasyMock.replay(binaryFn);
EasyMock.replay(combiningFunction);
EasyMock.replay(comparator);
String actual = testingIterator.next();
@ -104,7 +104,7 @@ public class CombiningIteratorTest
EasyMock.verify(peekIterator);
EasyMock.verify(comparator);
EasyMock.verify(binaryFn);
EasyMock.verify(combiningFunction);
}
@Test(expected = NoSuchElementException.class)

View File

@ -1,190 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
*/
public class OrderedMergeIteratorTest
{
@Test
public void testSanity()
{
final ArrayList<Iterator<Integer>> iterators = new ArrayList<>();
iterators.add(Arrays.asList(1, 3, 5, 7, 9).iterator());
iterators.add(Arrays.asList(2, 8).iterator());
iterators.add(Arrays.asList(4, 6, 8).iterator());
OrderedMergeIterator<Integer> iter = new OrderedMergeIterator<Integer>(
Ordering.natural(),
iterators.iterator()
);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9), Lists.newArrayList(iter));
}
@Test
public void testScrewsUpOnOutOfOrderBeginningOfList()
{
final ArrayList<Iterator<Integer>> iterators = new ArrayList<>();
iterators.add(Arrays.asList(1, 3, 5, 7, 9).iterator());
iterators.add(Arrays.asList(4, 6).iterator());
iterators.add(Arrays.asList(2, 8).iterator());
OrderedMergeIterator<Integer> iter = new OrderedMergeIterator<Integer>(
Ordering.natural(),
iterators.iterator()
);
Assert.assertEquals(Arrays.asList(1, 3, 4, 2, 5, 6, 7, 8, 9), Lists.newArrayList(iter));
}
@Test
public void testScrewsUpOnOutOfOrderInList()
{
final ArrayList<Iterator<Integer>> iterators = new ArrayList<>();
iterators.add(Arrays.asList(1, 3, 5, 4, 7, 9).iterator());
iterators.add(Arrays.asList(2, 8).iterator());
iterators.add(Arrays.asList(4, 6).iterator());
OrderedMergeIterator<Integer> iter = new OrderedMergeIterator<Integer>(
Ordering.natural(),
iterators.iterator()
);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 4, 6, 7, 8, 9), Lists.newArrayList(iter));
}
@Test
public void testLaziness()
{
final boolean[] done = new boolean[]{false, false};
final ArrayList<Iterator<Integer>> iterators = new ArrayList<>();
iterators.add(
new IteratorShell<Integer>(Arrays.asList(1, 2, 3).iterator())
{
@Override
public boolean hasNext()
{
boolean retVal = super.hasNext();
if (!retVal) {
done[0] = true;
}
return retVal;
}
}
);
iterators.add(
new IteratorShell<Integer>(Arrays.asList(4, 5, 6).iterator())
{
int count = 0;
@Override
public boolean hasNext()
{
if (count >= 1) {
Assert.assertTrue("First iterator not complete", done[0]);
}
boolean retVal = super.hasNext();
if (!retVal) {
done[1] = true;
}
return retVal;
}
@Override
public Integer next()
{
if (count >= 1) {
Assert.assertTrue("First iterator not complete", done[0]);
}
++count;
return super.next();
}
}
);
iterators.add(
new IteratorShell<Integer>(Arrays.asList(7, 8, 9).iterator())
{
int count = 0;
@Override
public boolean hasNext()
{
if (count >= 1) {
Assert.assertTrue("Second iterator not complete", done[1]);
}
Assert.assertTrue("First iterator not complete", done[0]);
return super.hasNext();
}
@Override
public Integer next()
{
if (count >= 1) {
Assert.assertTrue("Second iterator not complete", done[1]);
}
Assert.assertTrue("First iterator not complete", done[0]);
++count;
return super.next();
}
}
);
OrderedMergeIterator<Integer> iter = new OrderedMergeIterator<Integer>(
Ordering.natural(),
iterators.iterator()
);
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9), Lists.newArrayList(iter));
}
@Test(expected = NoSuchElementException.class)
public void testNoElementInNext()
{
OrderedMergeIterator<Integer> iter = new OrderedMergeIterator<>(
Ordering.natural(),
Collections.emptyIterator()
);
iter.next();
}
@Test(expected = UnsupportedOperationException.class)
public void testRemove()
{
OrderedMergeIterator<Integer> iter = new OrderedMergeIterator<>(
Ordering.natural(),
Collections.emptyIterator()
);
iter.remove();
}
}

View File

@ -1,350 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.collections;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceTestHelper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.TestSequence;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
*/
public class OrderedMergeSequenceTest
{
@Test
public void testSanity() throws Exception
{
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
TestSequence.create(1, 3, 5, 7, 9),
TestSequence.create(2, 8),
TestSequence.create(4, 6, 8)
);
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.natural(), testSequences);
SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 8, 9));
for (TestSequence<Integer> sequence : testSequences) {
Assert.assertTrue(sequence.isClosed());
}
}
@Test
public void testMergeEmptySequence() throws Exception
{
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
TestSequence.create(ImmutableList.of()),
TestSequence.create(2, 8),
TestSequence.create(4, 6, 8)
);
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.natural(), testSequences);
SequenceTestHelper.testAll(seq, Arrays.asList(2, 4, 6, 8, 8));
for (TestSequence<Integer> sequence : testSequences) {
Assert.assertTrue(sequence.isClosed());
}
}
@Test
public void testMergeEmptySequenceAtEnd() throws Exception
{
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
TestSequence.create(2, 8),
TestSequence.create(4, 6, 8),
TestSequence.create(ImmutableList.of())
);
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.natural(), testSequences);
SequenceTestHelper.testAll(seq, Arrays.asList(2, 4, 6, 8, 8));
for (TestSequence<Integer> sequence : testSequences) {
Assert.assertTrue(sequence.isClosed());
}
}
@Test
public void testMergeEmptySequenceMiddle() throws Exception
{
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
TestSequence.create(2, 8),
TestSequence.create(ImmutableList.of()),
TestSequence.create(4, 6, 8)
);
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.natural(), testSequences);
SequenceTestHelper.testAll(seq, Arrays.asList(2, 4, 6, 8, 8));
for (TestSequence<Integer> sequence : testSequences) {
Assert.assertTrue(sequence.isClosed());
}
}
@Test
public void testScrewsUpOnOutOfOrderBeginningOfList() throws Exception
{
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
TestSequence.create(1, 3, 5, 7, 9),
TestSequence.create(4, 6, 8),
TestSequence.create(2, 8)
);
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.natural(), testSequences);
SequenceTestHelper.testAll(seq, Arrays.asList(1, 3, 4, 2, 5, 6, 7, 8, 8, 9));
for (TestSequence<Integer> sequence : testSequences) {
Assert.assertTrue(sequence.isClosed());
}
}
@Test
public void testScrewsUpOnOutOfOrderInList() throws Exception
{
final ArrayList<TestSequence<Integer>> testSequences = Lists.newArrayList(
TestSequence.create(1, 3, 5, 4, 7, 9),
TestSequence.create(2, 8),
TestSequence.create(4, 6)
);
OrderedMergeSequence<Integer> seq = makeMergedSequence(Ordering.natural(), testSequences);
SequenceTestHelper.testAll(seq, Arrays.asList(1, 2, 3, 4, 5, 4, 6, 7, 8, 9));
for (TestSequence<Integer> sequence : testSequences) {
Assert.assertTrue(sequence.isClosed());
}
}
@Test
public void testLazinessAccumulation()
{
final ArrayList<Sequence<Integer>> sequences = makeSyncedSequences();
OrderedMergeSequence<Integer> seq = new OrderedMergeSequence<Integer>(
Ordering.natural(), Sequences.simple(sequences)
);
SequenceTestHelper.testAccumulation("", seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
}
@Test
public void testLazinessYielder() throws Exception
{
final ArrayList<Sequence<Integer>> sequences = makeSyncedSequences();
OrderedMergeSequence<Integer> seq = new OrderedMergeSequence<Integer>(
Ordering.natural(), Sequences.simple(sequences)
);
SequenceTestHelper.testYield("", seq, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9));
}
private ArrayList<Sequence<Integer>> makeSyncedSequences()
{
final boolean[] done = new boolean[]{false, false};
final ArrayList<Sequence<Integer>> sequences = new ArrayList<>();
sequences.add(
new BaseSequence<Integer, Iterator<Integer>>(
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
{
@Override
public Iterator<Integer> make()
{
return Arrays.asList(1, 2, 3).iterator();
}
@Override
public void cleanup(Iterator<Integer> iterFromMake)
{
done[0] = true;
}
}
)
);
sequences.add(
new BaseSequence<Integer, Iterator<Integer>>(
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
{
@Override
public Iterator<Integer> make()
{
return new IteratorShell<Integer>(Arrays.asList(4, 5, 6).iterator())
{
int count = 0;
@Override
public boolean hasNext()
{
if (count >= 1) {
Assert.assertTrue("First iterator not complete", done[0]);
}
return super.hasNext();
}
@Override
public Integer next()
{
if (count >= 1) {
Assert.assertTrue("First iterator not complete", done[0]);
}
++count;
return super.next();
}
};
}
@Override
public void cleanup(Iterator<Integer> iterFromMake)
{
done[1] = true;
}
}
)
);
sequences.add(
new BaseSequence<Integer, Iterator<Integer>>(
new BaseSequence.IteratorMaker<Integer, Iterator<Integer>>()
{
@Override
public Iterator<Integer> make()
{
return new IteratorShell<Integer>(Arrays.asList(7, 8, 9).iterator())
{
int count = 0;
@Override
public boolean hasNext()
{
if (count >= 1) {
Assert.assertTrue("Second iterator not complete", done[1]);
}
Assert.assertTrue("First iterator not complete", done[0]);
return super.hasNext();
}
@Override
public Integer next()
{
if (count >= 1) {
Assert.assertTrue("Second iterator not complete", done[1]);
}
Assert.assertTrue("First iterator not complete", done[0]);
++count;
return super.next();
}
};
}
@Override
public void cleanup(Iterator<Integer> iterFromMake)
{
}
}
)
);
return sequences;
}
private <T> OrderedMergeSequence<T> makeMergedSequence(
Ordering<T> ordering,
List<TestSequence<T>> seqs
)
{
return new OrderedMergeSequence<T>(
ordering,
Sequences.simple((List<Sequence<T>>) (List) seqs)
);
}
private <T> MergeSequence<T> makeUnorderedMergedSequence(
Ordering<T> ordering,
List<TestSequence<T>> seqs
)
{
return new MergeSequence<T>(ordering, Sequences.simple(seqs));
}
@Test
public void testHierarchicalMerge() throws Exception
{
final Sequence<Integer> seq1 = makeUnorderedMergedSequence(
Ordering.natural(),
Collections.singletonList(TestSequence.create(1))
);
final Sequence<Integer> seq2 = makeUnorderedMergedSequence(
Ordering.natural(),
Collections.singletonList(TestSequence.create(1))
);
final OrderedMergeSequence<Integer> finalMerged = new OrderedMergeSequence<Integer>(
Ordering.natural(),
Sequences.simple(
Lists.newArrayList(seq1, seq2)
)
);
SequenceTestHelper.testAll(finalMerged, Arrays.asList(1, 1));
}
@Test
public void testMergeMerge() throws Exception
{
final Sequence<Integer> seq1 = makeUnorderedMergedSequence(
Ordering.natural(),
Collections.singletonList(TestSequence.create(1))
);
final OrderedMergeSequence<Integer> finalMerged = new OrderedMergeSequence<Integer>(
Ordering.natural(),
Sequences.simple(
Collections.singletonList(seq1)
)
);
SequenceTestHelper.testAll(finalMerged, Collections.singletonList(1));
}
@Test
public void testOne() throws Exception
{
final MergeSequence<Integer> seq1 = makeUnorderedMergedSequence(
Ordering.natural(),
Collections.singletonList(TestSequence.create(1))
);
SequenceTestHelper.testAll(seq1, Collections.singletonList(1));
}
}

View File

@ -25,16 +25,29 @@ import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.YieldingAccumulator;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.BinaryOperator;
public class ComplexSequenceTest
{
// Integer::sum with more nulls
private static final BinaryOperator<Integer> PLUS_NULLABLE = (arg1, arg2) -> {
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
return arg1 + arg2;
};
@Test
public void testComplexSequence()
{
@ -80,28 +93,11 @@ public class ComplexSequenceTest
private Sequence<Integer> combine(Sequence<Integer> sequence)
{
return CombiningSequence.create(sequence, Comparators.alwaysEqual(), plus);
return CombiningSequence.create(sequence, Comparators.alwaysEqual(), PLUS_NULLABLE);
}
private Sequence<Integer> concat(Sequence<Integer>... sequences)
{
return Sequences.concat(Arrays.asList(sequences));
}
private final BinaryFn<Integer, Integer, Integer> plus = new BinaryFn<Integer, Integer, Integer>()
{
@Override
public Integer apply(Integer arg1, Integer arg2)
{
if (arg1 == null) {
return arg2;
}
if (arg2 == null) {
return arg1;
}
return arg1 + arg2;
}
};
}

View File

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava.nary;
import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Comparator;
/**
*/
public class SortedMergeIteratorTest
{
@Test
public void testSanity()
{
SortedMergeIterator<Integer, Integer> iter = SortedMergeIterator.create(
Arrays.asList(1, 4, 5, 7, 9).iterator(),
Arrays.asList(1, 2, 3, 6, 7, 8, 9, 10, 11).iterator(),
Comparator.naturalOrder(),
new BinaryFn<Integer, Integer, Integer>()
{
@Override
public Integer apply(Integer arg1, Integer arg2)
{
return arg1 == null ? arg2 : arg2 == null ? arg1 : arg1 + arg2;
}
}
);
Assert.assertEquals(
Arrays.asList(2, 2, 3, 4, 5, 6, 14, 8, 18, 10, 11),
Lists.newArrayList(iter)
);
}
}

View File

@ -122,11 +122,9 @@ public class DistinctCountTopNQueryTest
.metric("UV")
.threshold(10)
.aggregators(
Lists.newArrayList(
QueryRunnerTestHelper.rowsCount,
new DistinctCountAggregatorFactory("UV", visitor_id, null)
)
)
.build();
final Iterable<Result<TopNResultValue>> results =

View File

@ -208,9 +208,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase
.metric("cost")
.threshold(4)
.intervals("2011-04-01/2011-04-06")
.aggregators(
Collections.singletonList(new LongSumAggregatorFactory("cost", "cost"))
)
.aggregators(new LongSumAggregatorFactory("cost", "cost"))
.build();
List<Query> expectedQueryAfterOptimizing = Lists.newArrayList(
@ -221,9 +219,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase
.metric("cost")
.threshold(4)
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-01/2011-04-04"))))
.aggregators(
Collections.singletonList(new LongSumAggregatorFactory("cost", "cost"))
)
.aggregators(new LongSumAggregatorFactory("cost", "cost"))
.build(),
new TopNQueryBuilder()
.dataSource("base")
@ -232,9 +228,7 @@ public class DatasourceOptimizerTest extends CuratorTestBase
.metric("cost")
.threshold(4)
.intervals(new MultipleIntervalSegmentSpec(Collections.singletonList(Intervals.of("2011-04-04/2011-04-06"))))
.aggregators(
Collections.singletonList(new LongSumAggregatorFactory("cost", "cost"))
)
.aggregators(new LongSumAggregatorFactory("cost", "cost"))
.build()
);
Assert.assertEquals(expectedQueryAfterOptimizing, optimizer.optimize(userQuery));

View File

@ -40,7 +40,6 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
public class MaterializedViewQueryTest
{
@ -80,7 +79,7 @@ public class MaterializedViewQueryTest
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
MaterializedViewQuery query = new MaterializedViewQuery(topNQuery, optimizer);
String json = jsonMapper.writeValueAsString(query);

View File

@ -142,12 +142,10 @@ public class ApproximateHistogramTopNQueryTest
)
)
.postAggregators(
Arrays.asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg,
new QuantilePostAggregator("quantile", "apphisto", 0.5f)
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(

View File

@ -142,12 +142,10 @@ public class FixedBucketsHistogramTopNQueryTest
)
)
.postAggregators(
Arrays.asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg,
new QuantilePostAggregator("quantile", "histo", 0.5f)
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(

View File

@ -85,11 +85,11 @@ public class VarianceTopNQueryTest
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexer;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Splitter;
@ -39,7 +38,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
@ -73,7 +71,6 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@ -573,28 +570,10 @@ public class DeterminePartitionsJob implements Jobby
return new CombiningIterable<>(
Iterables.transform(
input,
new Function<Text, DimValueCount>()
{
@Override
public DimValueCount apply(Text input)
{
return DimValueCount.fromText(input);
}
}
DimValueCount::fromText
),
new Comparator<DimValueCount>()
{
@Override
public int compare(DimValueCount o1, DimValueCount o2)
{
return ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result();
}
},
new BinaryFn<DimValueCount, DimValueCount, DimValueCount>()
{
@Override
public DimValueCount apply(DimValueCount arg1, DimValueCount arg2)
{
(o1, o2) -> ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result(),
(arg1, arg2) -> {
if (arg2 == null) {
return arg1;
}
@ -603,7 +582,6 @@ public class DeterminePartitionsJob implements Jobby
final long newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
return new DimValueCount(arg1.dim, arg1.value, newNumRows);
}
}
);
}
}

View File

@ -24,11 +24,14 @@ import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Function;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.timeline.LogicalSegment;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.function.BinaryOperator;
/**
* The broker-side (also used by server in some cases) API for a specific Query type.
@ -77,11 +80,38 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
* ResultType objects in time order (ascending or descending). This method should return a new QueryRunner that
* potentially merges the stream of ordered ResultType objects.
*
* A default implementation constructs a {@link ResultMergeQueryRunner} which creates a
* {@link org.apache.druid.common.guava.CombiningSequence} using the supplied {@link QueryRunner} with
* {@link QueryToolChest#createResultComparator(Query)} and {@link QueryToolChest#createMergeFn(Query)}} supplied by this
* toolchest.
*
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending or descending)
*
* @return a QueryRunner that potentially merges the stream of ordered ResultType objects
*/
public abstract QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner);
public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner)
{
return new ResultMergeQueryRunner<>(runner, this::createResultComparator, this::createMergeFn);
}
/**
* Creates a merge function that is used to merge intermediate aggregates from historicals in broker. This merge
* function is used in the default {@link ResultMergeQueryRunner} provided by
* {@link QueryToolChest#mergeResults(QueryRunner)} and can be used in additional future merge implementations
*/
public BinaryOperator<ResultType> createMergeFn(Query<ResultType> query)
{
throw new UOE("%s doesn't provide a merge function", query.getClass().getName());
}
/**
* Creates an ordering comparator that is used to order results. This comparator is used in the default
* {@link ResultMergeQueryRunner} provided by {@link QueryToolChest#mergeResults(QueryRunner)}
*/
public Comparator<ResultType> createResultComparator(Query<ResultType> query)
{
throw new UOE("%s doesn't provide a result comparator", query.getClass().getName());
}
/**
* Creates a {@link QueryMetrics} object that is used to generate metrics for this specific query type. This exists

View File

@ -19,34 +19,41 @@
package org.apache.druid.query;
import com.google.common.collect.Ordering;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import java.util.Comparator;
import java.util.Map;
import java.util.function.BinaryOperator;
import java.util.function.Function;
/**
*/
@PublicApi
public abstract class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
public class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
{
private final Function<Query<T>, Comparator<T>> comparatorGenerator;
private final Function<Query<T>, BinaryOperator<T>> mergeFnGenerator;
public ResultMergeQueryRunner(
QueryRunner<T> baseRunner
QueryRunner<T> baseRunner,
Function<Query<T>, Comparator<T>> comparatorGenerator,
Function<Query<T>, BinaryOperator<T>> mergeFnGenerator
)
{
super(baseRunner);
this.comparatorGenerator = comparatorGenerator;
this.mergeFnGenerator = mergeFnGenerator;
}
@Override
public Sequence<T> doRun(QueryRunner<T> baseRunner, QueryPlus<T> queryPlus, Map<String, Object> context)
{
Query<T> query = queryPlus.getQuery();
return CombiningSequence.create(baseRunner.run(queryPlus, context), makeOrdering(query), createMergeFn(query));
return CombiningSequence.create(
baseRunner.run(queryPlus, context),
comparatorGenerator.apply(query),
mergeFnGenerator.apply(query));
}
protected abstract Ordering<T> makeOrdering(Query<T> query);
protected abstract BinaryFn<T, T, T> createMergeFn(Query<T> query);
}

View File

@ -41,6 +41,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
@ -64,12 +65,14 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BinaryOperator;
/**
*/
@ -114,11 +117,7 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
@Override
public QueryRunner<Row> mergeResults(final QueryRunner<Row> runner)
{
return new QueryRunner<Row>()
{
@Override
public Sequence<Row> run(QueryPlus<Row> queryPlus, Map<String, Object> responseContext)
{
return (queryPlus, responseContext) -> {
if (QueryContexts.isBySegment(queryPlus.getQuery())) {
return runner.run(queryPlus, responseContext);
}
@ -128,10 +127,21 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
return initAndMergeGroupByResults(groupByQuery, runner, responseContext);
}
return runner.run(queryPlus, responseContext);
}
};
}
@Override
public BinaryOperator<Row> createMergeFn(Query<Row> query)
{
return strategySelector.strategize((GroupByQuery) query).createMergeFn(query);
}
@Override
public Comparator<Row> createResultComparator(Query<Row> query)
{
return strategySelector.strategize((GroupByQuery) query).createResultComparator(query);
}
private Sequence<Row> initAndMergeGroupByResults(
final GroupByQuery query,
QueryRunner<Row> runner,

View File

@ -23,15 +23,15 @@ import com.google.common.collect.Maps;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.joda.time.DateTime;
import java.util.Map;
import java.util.function.BinaryOperator;
public class GroupByBinaryFnV2 implements BinaryFn<Row, Row, Row>
public class GroupByBinaryFnV2 implements BinaryOperator<Row>
{
private final GroupByQuery query;
@ -50,8 +50,7 @@ public class GroupByBinaryFnV2 implements BinaryFn<Row, Row, Row>
}
final Map<String, Object> newMap = Maps.newHashMapWithExpectedSize(
query.getDimensions().size()
+ query.getAggregatorSpecs().size()
query.getDimensions().size() + query.getAggregatorSpecs().size()
);
// Add dimensions

View File

@ -21,8 +21,10 @@ package org.apache.druid.query.groupby.strategy;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQuery;
@ -30,8 +32,11 @@ import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.resource.GroupByQueryResource;
import org.apache.druid.segment.StorageAdapter;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.function.BinaryOperator;
public interface GroupByStrategy
{
@ -69,6 +74,26 @@ public interface GroupByStrategy
Sequence<Row> mergeResults(QueryRunner<Row> baseRunner, GroupByQuery query, Map<String, Object> responseContext);
/**
* See {@link org.apache.druid.query.QueryToolChest#createMergeFn(Query)} for details, allows
* {@link GroupByQueryQueryToolChest} to delegate implementation to the strategy
*/
@Nullable
default BinaryOperator<Row> createMergeFn(Query<Row> query)
{
throw new UOE("%s doesn't provide a merge function", this.getClass().getName());
}
/**
* See {@link org.apache.druid.query.QueryToolChest#createResultComparator(Query)}, allows
* {@link GroupByQueryQueryToolChest} to delegate implementation to the strategy
*/
@Nullable
default Comparator<Row> createResultComparator(Query<Row> queryParam)
{
throw new UOE("%s doesn't provide a result comparator", this.getClass().getName());
}
Sequence<Row> applyPostProcessing(Sequence<Row> results, GroupByQuery query);
Sequence<Row> processSubqueryResult(

View File

@ -26,7 +26,6 @@ import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
@ -43,7 +42,6 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException;
@ -76,9 +74,11 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
public class GroupByStrategyV2 implements GroupByStrategy
@ -213,6 +213,18 @@ public class GroupByStrategyV2 implements GroupByStrategy
return runner;
}
@Override
public Comparator<Row> createResultComparator(Query<Row> queryParam)
{
return ((GroupByQuery) queryParam).getRowOrdering(true);
}
@Override
public BinaryOperator<Row> createMergeFn(Query<Row> queryParam)
{
return new GroupByBinaryFnV2((GroupByQuery) queryParam);
}
@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
@ -222,20 +234,11 @@ public class GroupByStrategyV2 implements GroupByStrategy
{
// Merge streams using ResultMergeQueryRunner, then apply postaggregators, then apply limit (which may
// involve materialization)
final ResultMergeQueryRunner<Row> mergingQueryRunner = new ResultMergeQueryRunner<Row>(baseRunner)
{
@Override
protected Ordering<Row> makeOrdering(Query<Row> queryParam)
{
return ((GroupByQuery) queryParam).getRowOrdering(true);
}
@Override
protected BinaryFn<Row, Row, Row> createMergeFn(Query<Row> queryParam)
{
return new GroupByBinaryFnV2((GroupByQuery) queryParam);
}
};
final ResultMergeQueryRunner<Row> mergingQueryRunner = new ResultMergeQueryRunner<>(
baseRunner,
this::createResultComparator,
this::createMergeFn
);
// Fudge timestamp, maybe.
final DateTime fudgeTimestamp = getUniversalTimestamp(query);

View File

@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.MappedSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.BySegmentSkippingQueryRunner;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
@ -67,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.BinaryOperator;
public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAnalysis, SegmentMetadataQuery>
{
@ -137,16 +137,9 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
return query.getResultOrdering(); // No two elements should be equal, so it should never merge
}
private BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis> createMergeFn(final SegmentMetadataQuery inQ)
private BinaryOperator<SegmentAnalysis> createMergeFn(final SegmentMetadataQuery inQ)
{
return new BinaryFn<SegmentAnalysis, SegmentAnalysis, SegmentAnalysis>()
{
@Override
public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
{
return mergeAnalyses(arg1, arg2, inQ.isLenientAggregatorMerge());
}
};
return (arg1, arg2) -> mergeAnalyses(arg1, arg2, inQ.isLenientAggregatorMerge());
}
};
}

View File

@ -23,17 +23,16 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.Result;
import org.joda.time.DateTime;
import java.util.Arrays;
import java.util.List;
import java.util.function.BinaryOperator;
/**
*/
public class SearchBinaryFn
implements BinaryFn<Result<SearchResultValue>, Result<SearchResultValue>, Result<SearchResultValue>>
public class SearchBinaryFn implements BinaryOperator<Result<SearchResultValue>>
{
private final SearchSortSpec searchSortSpec;
private final Granularity gran;
@ -113,6 +112,6 @@ public class SearchBinaryFn
? arg1.getTimestamp()
: gran.bucketStart(arg1.getTimestamp());
return new Result<SearchResultValue>(timestamp, new SearchResultValue(results));
return new Result<>(timestamp, new SearchResultValue(results));
}
}

View File

@ -26,7 +26,6 @@ import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
@ -34,7 +33,6 @@ import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
@ -44,7 +42,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
@ -52,9 +49,11 @@ import org.apache.druid.query.filter.DimFilter;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
/**
*/
@ -95,30 +94,18 @@ public class SearchQueryQueryToolChest extends QueryToolChest<Result<SearchResul
}
@Override
public QueryRunner<Result<SearchResultValue>> mergeResults(
QueryRunner<Result<SearchResultValue>> runner
public BinaryOperator<Result<SearchResultValue>> createMergeFn(
Query<Result<SearchResultValue>> query
)
{
return new ResultMergeQueryRunner<Result<SearchResultValue>>(runner)
{
@Override
protected Ordering<Result<SearchResultValue>> makeOrdering(Query<Result<SearchResultValue>> query)
{
return ResultGranularTimestampComparator.create(
((SearchQuery) query).getGranularity(),
query.isDescending()
);
final SearchQuery searchQuery = (SearchQuery) query;
return new SearchBinaryFn(searchQuery.getSort(), searchQuery.getGranularity(), searchQuery.getLimit());
}
@Override
protected BinaryFn<Result<SearchResultValue>, Result<SearchResultValue>, Result<SearchResultValue>> createMergeFn(
Query<Result<SearchResultValue>> input
)
public Comparator<Result<SearchResultValue>> createResultComparator(Query<Result<SearchResultValue>> query)
{
SearchQuery query = (SearchQuery) input;
return new SearchBinaryFn(query.getSort(), query.getGranularity(), query.getLimit());
}
};
return ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending());
}
@Override

View File

@ -22,17 +22,16 @@ package org.apache.druid.query.select;
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.Result;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Set;
import java.util.function.BinaryOperator;
/**
*/
public class SelectBinaryFn
implements BinaryFn<Result<SelectResultValue>, Result<SelectResultValue>, Result<SelectResultValue>>
public class SelectBinaryFn implements BinaryOperator<Result<SelectResultValue>>
{
private final Granularity gran;
private final PagingSpec pagingSpec;

View File

@ -26,13 +26,11 @@ import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
@ -41,7 +39,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.filter.DimFilter;
@ -53,12 +50,14 @@ import org.joda.time.Interval;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BinaryOperator;
/**
*/
@ -100,33 +99,18 @@ public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResul
}
@Override
public QueryRunner<Result<SelectResultValue>> mergeResults(
QueryRunner<Result<SelectResultValue>> queryRunner
public BinaryOperator<Result<SelectResultValue>> createMergeFn(
Query<Result<SelectResultValue>> query
)
{
return new ResultMergeQueryRunner<Result<SelectResultValue>>(queryRunner)
{
@Override
protected Ordering<Result<SelectResultValue>> makeOrdering(Query<Result<SelectResultValue>> query)
{
return ResultGranularTimestampComparator.create(
((SelectQuery) query).getGranularity(), query.isDescending()
);
final SelectQuery selectQuery = (SelectQuery) query;
return new SelectBinaryFn(selectQuery.getGranularity(), selectQuery.getPagingSpec(), selectQuery.isDescending());
}
@Override
protected BinaryFn<Result<SelectResultValue>, Result<SelectResultValue>, Result<SelectResultValue>> createMergeFn(
Query<Result<SelectResultValue>> input
)
public Comparator<Result<SelectResultValue>> createResultComparator(Query<Result<SelectResultValue>> query)
{
SelectQuery query = (SelectQuery) input;
return new SelectBinaryFn(
query.getGranularity(),
query.getPagingSpec(),
query.isDescending()
);
}
};
return ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending());
}
@Override

View File

@ -21,18 +21,17 @@ package org.apache.druid.query.timeseries;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
/**
*/
public class TimeseriesBinaryFn
implements BinaryFn<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>, Result<TimeseriesResultValue>>
public class TimeseriesBinaryFn implements BinaryOperator<Result<TimeseriesResultValue>>
{
private final Granularity gran;
private final List<AggregatorFactory> aggregations;

View File

@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.DateTimes;
@ -35,7 +34,6 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
import org.apache.druid.query.Query;
@ -54,10 +52,12 @@ import org.apache.druid.query.groupby.RowBasedColumnSelectorFactory;
import org.joda.time.DateTime;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
/**
*/
@ -99,7 +99,10 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
)
{
final QueryRunner<Result<TimeseriesResultValue>> resultMergeQueryRunner = new ResultMergeQueryRunner<Result<TimeseriesResultValue>>(
queryRunner)
queryRunner,
this::createResultComparator,
this::createMergeFn
)
{
@Override
public Sequence<Result<TimeseriesResultValue>> doRun(
@ -120,26 +123,6 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
}
return result;
}
@Override
protected Ordering<Result<TimeseriesResultValue>> makeOrdering(Query<Result<TimeseriesResultValue>> query)
{
return ResultGranularTimestampComparator.create(
((TimeseriesQuery) query).getGranularity(), query.isDescending()
);
}
@Override
protected BinaryFn<Result<TimeseriesResultValue>, Result<TimeseriesResultValue>, Result<TimeseriesResultValue>> createMergeFn(
Query<Result<TimeseriesResultValue>> input
)
{
TimeseriesQuery query = (TimeseriesQuery) input;
return new TimeseriesBinaryFn(
query.getGranularity(),
query.getAggregatorSpecs()
);
}
};
return (queryPlus, responseContext) -> {
@ -211,6 +194,21 @@ public class TimeseriesQueryQueryToolChest extends QueryToolChest<Result<Timeser
};
}
@Override
public BinaryOperator<Result<TimeseriesResultValue>> createMergeFn(
Query<Result<TimeseriesResultValue>> query
)
{
TimeseriesQuery timeseriesQuery = (TimeseriesQuery) query;
return new TimeseriesBinaryFn(timeseriesQuery.getGranularity(), timeseriesQuery.getAggregatorSpecs());
}
@Override
public Comparator<Result<TimeseriesResultValue>> createResultComparator(Query<Result<TimeseriesResultValue>> query)
{
return ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending());
}
private Result<TimeseriesResultValue> getNullTimeseriesResultValue(TimeseriesQuery query)
{
List<AggregatorFactory> aggregatorSpecs = query.getAggregatorSpecs();

View File

@ -21,7 +21,6 @@ package org.apache.druid.query.topn;
import org.apache.druid.java.util.common.granularity.AllGranularity;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
@ -33,10 +32,11 @@ import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
/**
*/
public class TopNBinaryFn implements BinaryFn<Result<TopNResultValue>, Result<TopNResultValue>, Result<TopNResultValue>>
public class TopNBinaryFn implements BinaryOperator<Result<TopNResultValue>>
{
private final DimensionSpec dimSpec;
private final Granularity gran;

View File

@ -253,12 +253,24 @@ public class TopNQueryBuilder
return this;
}
public TopNQueryBuilder aggregators(AggregatorFactory... aggs)
{
aggregatorSpecs = Arrays.asList(aggs);
return this;
}
public TopNQueryBuilder postAggregators(Collection<PostAggregator> p)
{
postAggregatorSpecs = new ArrayList<>(p); // defensive copy
return this;
}
public TopNQueryBuilder postAggregators(PostAggregator... postAggs)
{
postAggregatorSpecs = Arrays.asList(postAggs);
return this;
}
public TopNQueryBuilder context(Map<String, Object> c)
{
context = c;

View File

@ -25,13 +25,11 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.BySegmentResultValue;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.IntervalChunkingQueryRunnerDecorator;
@ -42,7 +40,6 @@ import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.Result;
import org.apache.druid.query.ResultGranularTimestampComparator;
import org.apache.druid.query.ResultMergeQueryRunner;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.MetricManipulationFn;
@ -53,9 +50,11 @@ import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.joda.time.DateTime;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;
/**
*/
@ -109,36 +108,25 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
}
@Override
public QueryRunner<Result<TopNResultValue>> mergeResults(
QueryRunner<Result<TopNResultValue>> runner
public BinaryOperator<Result<TopNResultValue>> createMergeFn(
Query<Result<TopNResultValue>> query
)
{
return new ResultMergeQueryRunner<Result<TopNResultValue>>(runner)
{
@Override
protected Ordering<Result<TopNResultValue>> makeOrdering(Query<Result<TopNResultValue>> query)
{
return ResultGranularTimestampComparator.create(
((TopNQuery) query).getGranularity(), query.isDescending()
TopNQuery topNQuery = (TopNQuery) query;
return new TopNBinaryFn(
topNQuery.getGranularity(),
topNQuery.getDimensionSpec(),
topNQuery.getTopNMetricSpec(),
topNQuery.getThreshold(),
topNQuery.getAggregatorSpecs(),
topNQuery.getPostAggregatorSpecs()
);
}
@Override
protected BinaryFn<Result<TopNResultValue>, Result<TopNResultValue>, Result<TopNResultValue>> createMergeFn(
Query<Result<TopNResultValue>> input
)
public Comparator<Result<TopNResultValue>> createResultComparator(Query<Result<TopNResultValue>> query)
{
TopNQuery query = (TopNQuery) input;
return new TopNBinaryFn(
query.getGranularity(),
query.getDimensionSpec(),
query.getTopNMetricSpec(),
query.getThreshold(),
query.getAggregatorSpecs(),
query.getPostAggregatorSpecs()
);
}
};
return ResultGranularTimestampComparator.create(query.getGranularity(), query.isDescending());
}
@Override
@ -582,7 +570,7 @@ public class TopNQueryQueryToolChest extends QueryToolChest<Result<TopNResultVal
BySegmentResultValue<Result<TopNResultValue>> value = (BySegmentResultValue<Result<TopNResultValue>>) input
.getValue();
return new Result<TopNResultValue>(
return new Result<>(
input.getTimestamp(),
new BySegmentTopNResultValue(
Lists.transform(

View File

@ -20,7 +20,6 @@
package org.apache.druid.collections;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.guava.nary.BinaryFn;
import org.apache.druid.query.Result;
import org.junit.Assert;
import org.junit.Test;
@ -39,29 +38,18 @@ public class CombiningIterableTest
public void testMerge()
{
List<Result<Object>> resultsBefore = Arrays.asList(
new Result<Object>(DateTimes.of("2011-01-01"), 1L),
new Result<Object>(DateTimes.of("2011-01-01"), 2L)
new Result<>(DateTimes.of("2011-01-01"), 1L),
new Result<>(DateTimes.of("2011-01-01"), 2L)
);
Iterable<Result<Object>> expectedResults = Collections.singletonList(
new Result<Object>(DateTimes.of("2011-01-01"), 3L)
new Result<>(DateTimes.of("2011-01-01"), 3L)
);
Iterable<Result<Object>> resultsAfter = CombiningIterable.create(
resultsBefore,
new Comparator<Result<Object>>()
{
@Override
public int compare(Result<Object> r1, Result<Object> r2)
{
return r1.getTimestamp().compareTo(r2.getTimestamp());
}
},
new BinaryFn<Result<Object>, Result<Object>, Result<Object>>()
{
@Override
public Result<Object> apply(final Result<Object> arg1, final Result<Object> arg2)
{
Comparator.comparing(Result::getTimestamp),
(arg1, arg2) -> {
if (arg1 == null) {
return arg2;
}
@ -70,12 +58,11 @@ public class CombiningIterableTest
return arg1;
}
return new Result<Object>(
return new Result<>(
arg1.getTimestamp(),
((Long) arg1.getValue()).longValue() + ((Long) arg2.getValue()).longValue()
);
}
}
);
Iterator<Result<Object>> it1 = expectedResults.iterator();

View File

@ -33,7 +33,6 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -61,7 +60,7 @@ public class DefaultQueryMetricsTest
))
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.aggregators(new CountAggregatorFactory("count"))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null))
.build();

View File

@ -1016,7 +1016,7 @@ public class MultiValuedDimensionTest
))
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.aggregators(new CountAggregatorFactory("count"))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null))
.build();
@ -1071,7 +1071,7 @@ public class MultiValuedDimensionTest
)
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.aggregators(new CountAggregatorFactory("count"))
.threshold(15)
.build();
@ -1132,7 +1132,7 @@ public class MultiValuedDimensionTest
)
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.aggregators(new CountAggregatorFactory("count"))
.threshold(15)
.build();

View File

@ -35,7 +35,6 @@ import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@ -63,7 +62,7 @@ public class DefaultTopNQueryMetricsTest
))
.metric("count")
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec)
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
.aggregators(new CountAggregatorFactory("count"))
.threshold(5)
.filters(new SelectorDimFilter("tags", "t3", null))
.build();

View File

@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMaxAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
import org.apache.druid.query.filter.Filter;
@ -50,10 +51,20 @@ import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
public class TopNMetricSpecOptimizationsTest
{
private static final List<AggregatorFactory> AGGS = Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
);
@Test
public void testShouldOptimizeLexicographic()
{
@ -67,18 +78,8 @@ public class TopNMetricSpecOptimizationsTest
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-31T00:00:00Z")
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.aggregators(AGGS)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
StorageAdapter adapter =
@ -111,18 +112,8 @@ public class TopNMetricSpecOptimizationsTest
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-30T01:00:00Z")
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.aggregators(AGGS)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
StorageAdapter adapter =
@ -156,18 +147,8 @@ public class TopNMetricSpecOptimizationsTest
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-30T01:00:00Z")
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.aggregators(AGGS)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
StorageAdapter adapter =
@ -202,18 +183,8 @@ public class TopNMetricSpecOptimizationsTest
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-31T00:00:00Z")
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.aggregators(AGGS)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
StorageAdapter adapter =
@ -246,18 +217,8 @@ public class TopNMetricSpecOptimizationsTest
.metric(QueryRunnerTestHelper.indexMetric)
.threshold(threshold)
.intervals("2018-05-30T00:00:00Z/2018-05-31T00:00:00Z")
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.commonDoubleAggregators,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.aggregators(AGGS)
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();

View File

@ -40,7 +40,6 @@ import org.junit.Ignore;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -78,7 +77,7 @@ public class TopNQueryRunnerBenchmark extends AbstractBenchmark
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
private static final Map<TestCases, QueryRunner> testCaseMap = new HashMap<>();

View File

@ -37,7 +37,6 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
public class TopNQueryTest
{
@ -64,7 +63,7 @@ public class TopNQueryTest
)
)
)
.postAggregators(Collections.singletonList(QueryRunnerTestHelper.addRowsIndexConstant))
.postAggregators(QueryRunnerTestHelper.addRowsIndexConstant)
.build();
String json = jsonMapper.writeValueAsString(query);
@ -122,7 +121,7 @@ public class TopNQueryTest
.metric(new DimensionTopNMetricSpec(null, StringComparators.ALPHANUMERIC))
.threshold(2)
.intervals(QueryRunnerTestHelper.fullOnIntervalSpec.getIntervals())
.aggregators(Collections.singletonList(QueryRunnerTestHelper.rowsCount))
.aggregators(QueryRunnerTestHelper.rowsCount)
.build();
String jsonQuery = "{\n"
+ " \"queryType\": \"topN\",\n"

View File

@ -123,16 +123,14 @@ public class TopNUnionQueryTest
)
)
.postAggregators(
Arrays.asList(
QueryRunnerTestHelper.addRowsIndexConstant,
QueryRunnerTestHelper.dependentPostAgg,
QueryRunnerTestHelper.hyperUniqueFinalizingPostAgg
)
)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.<Map<String, Object>>asList(

View File

@ -658,7 +658,7 @@ public class AppendTest
)
)
)
.postAggregators(Collections.singletonList(addRowsIndexConstant))
.postAggregators(addRowsIndexConstant)
.build();
}
@ -688,7 +688,7 @@ public class AppendTest
)
)
)
.postAggregators(Collections.singletonList(addRowsIndexConstant))
.postAggregators(addRowsIndexConstant)
.build();
}

View File

@ -1524,7 +1524,7 @@ public class SchemalessTestFullTest
)
)
)
.postAggregators(Collections.singletonList(addRowsIndexConstant))
.postAggregators(addRowsIndexConstant)
.build();
failMsg += " topN ";
@ -1557,7 +1557,7 @@ public class SchemalessTestFullTest
)
)
)
.postAggregators(Collections.singletonList(addRowsIndexConstant))
.postAggregators(addRowsIndexConstant)
.build();
failMsg += " filtered topN ";

View File

@ -190,11 +190,11 @@ public class SchemalessTestSimpleTest
)
)
)
.postAggregators(Collections.singletonList(addRowsIndexConstant))
.postAggregators(addRowsIndexConstant)
.build();
List<Result<TopNResultValue>> expectedResults = Collections.singletonList(
new Result<TopNResultValue>(
new Result<>(
DateTimes.of("2011-01-12T00:00:00.000Z"),
new TopNResultValue(
Arrays.asList(

View File

@ -356,7 +356,7 @@ public class IncrementalIndexStorageAdapterTest
.dimension("sally")
.metric("cnt")
.threshold(10)
.aggregators(Collections.singletonList(new LongSumAggregatorFactory("cnt", "cnt")))
.aggregators(new LongSumAggregatorFactory("cnt", "cnt"))
.build(),
new IncrementalIndexStorageAdapter(index),
null

View File

@ -360,9 +360,7 @@ public class DummyStringVirtualColumnTest
.dimension(VSTRING_DIM)
.metric(COUNT)
.threshold(1)
.aggregators(
Collections.singletonList(new CountAggregatorFactory(COUNT))
)
.aggregators(new CountAggregatorFactory(COUNT))
.virtualColumns(new DummyStringVirtualColumn(
QueryRunnerTestHelper.marketDimension,
VSTRING_DIM,

View File

@ -108,25 +108,18 @@ public class BrokerServerView implements TimelineServerView
this.selectors = new HashMap<>();
this.timelines = new HashMap<>();
this.segmentFilter = new Predicate<Pair<DruidServerMetadata, DataSegment>>()
{
@Override
public boolean apply(
Pair<DruidServerMetadata, DataSegment> input
)
{
this.segmentFilter = (Pair<DruidServerMetadata, DataSegment> metadataAndSegment) -> {
if (segmentWatcherConfig.getWatchedTiers() != null
&& !segmentWatcherConfig.getWatchedTiers().contains(input.lhs.getTier())) {
&& !segmentWatcherConfig.getWatchedTiers().contains(metadataAndSegment.lhs.getTier())) {
return false;
}
if (segmentWatcherConfig.getWatchedDataSources() != null
&& !segmentWatcherConfig.getWatchedDataSources().contains(input.rhs.getDataSource())) {
&& !segmentWatcherConfig.getWatchedDataSources().contains(metadataAndSegment.rhs.getDataSource())) {
return false;
}
return true;
}
};
ExecutorService exec = Execs.singleThreaded("BrokerServerView-%s");
baseView.registerSegmentCallback(
@ -160,14 +153,9 @@ public class BrokerServerView implements TimelineServerView
baseView.registerServerRemovedCallback(
exec,
new ServerRemovedCallback()
{
@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
server -> {
removeServer(server);
return ServerView.CallbackAction.CONTINUE;
}
return CallbackAction.CONTINUE;
}
);
}
@ -195,10 +183,10 @@ public class BrokerServerView implements TimelineServerView
private QueryableDruidServer addServer(DruidServer server)
{
QueryableDruidServer retVal = new QueryableDruidServer(server, makeDirectClient(server));
QueryableDruidServer retVal = new QueryableDruidServer<>(server, makeDirectClient(server));
QueryableDruidServer exists = clients.put(server.getName(), retVal);
if (exists != null) {
log.warn("QueryRunner for server[%s] already existed!? Well it's getting replaced", server);
log.warn("QueryRunner for server[%s] already exists!? Well it's getting replaced", server);
}
return retVal;
@ -326,7 +314,7 @@ public class BrokerServerView implements TimelineServerView
log.error("WTF?! No QueryableDruidServer found for %s", server.getName());
return null;
}
return queryableDruidServer.getClient();
return queryableDruidServer.getQueryRunner();
}
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.client.selector;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.timeline.DataSegment;
import java.util.Collections;
@ -31,7 +32,7 @@ import java.util.Set;
public class ConnectionCountServerSelectorStrategy implements ServerSelectorStrategy
{
private static final Comparator<QueryableDruidServer> COMPARATOR =
Comparator.comparingInt(s -> s.getClient().getNumOpenConnections());
Comparator.comparingInt(s -> ((DirectDruidClient) s.getQueryRunner()).getNumOpenConnections());
@Override
public QueryableDruidServer pick(Set<QueryableDruidServer> servers, DataSegment segment)

View File

@ -19,20 +19,20 @@
package org.apache.druid.client.selector;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.query.QueryRunner;
/**
*/
public class QueryableDruidServer
public class QueryableDruidServer<T extends QueryRunner>
{
private final DruidServer server;
private final DirectDruidClient client;
private final T queryRunner;
public QueryableDruidServer(DruidServer server, DirectDruidClient client)
public QueryableDruidServer(DruidServer server, T queryRunner)
{
this.server = server;
this.client = client;
this.queryRunner = queryRunner;
}
public DruidServer getServer()
@ -40,9 +40,9 @@ public class QueryableDruidServer
return server;
}
public DirectDruidClient getClient()
public T getQueryRunner()
{
return client;
return queryRunner;
}
@Override
@ -50,7 +50,7 @@ public class QueryableDruidServer
{
return "QueryableDruidServer{" +
"server=" + server +
", client=" + client +
", queryRunner=" + queryRunner +
'}';
}
}

View File

@ -941,7 +941,7 @@ public class CachingClusteredClientTest
.dimension("a")
.metric("b")
.threshold(3)
.aggregators(Collections.<AggregatorFactory>singletonList(new CountAggregatorFactory("b")))
.aggregators(new CountAggregatorFactory("b"))
.build(),
sequences
)

View File

@ -23,11 +23,9 @@ import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.js.JavaScriptConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
@ -117,11 +115,7 @@ public class JavaScriptTieredBrokerSelectorStrategyTest
.dimension("bigdim")
.metric("count")
.threshold(1)
.aggregators(
ImmutableList.<AggregatorFactory>of(
new CountAggregatorFactory("count")
)
);
.aggregators(new CountAggregatorFactory("count"));
Assert.assertEquals(
Optional.absent(),
@ -145,11 +139,9 @@ public class JavaScriptTieredBrokerSelectorStrategyTest
STRATEGY.getBrokerServiceName(
tieredBrokerConfig,
queryBuilder.aggregators(
ImmutableList.of(
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("longSum", "a"),
new DoubleSumAggregatorFactory("doubleSum", "b")
)
).build()
)
);
@ -161,11 +153,9 @@ public class JavaScriptTieredBrokerSelectorStrategyTest
STRATEGY.getBrokerServiceName(
tieredBrokerConfig,
queryBuilder.aggregators(
ImmutableList.of(
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("longSum", "a"),
new DoubleSumAggregatorFactory("doubleSum", "b")
)
).build()
)
);

View File

@ -1495,9 +1495,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim1", "d0"))
.postAggregators(ImmutableList.of(
expressionPostAgg("p0", "substring(\"d0\", 1, -1)")
))
.postAggregators(expressionPostAgg("p0", "substring(\"d0\", 1, -1)"))
.metric(new DimensionTopNMetricSpec(null, StringComparators.LEXICOGRAPHIC))
.threshold(10)
.context(QUERY_CONTEXT_DEFAULT)
@ -1532,10 +1530,10 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim1", "d0"))
.postAggregators(ImmutableList.of(
.postAggregators(
expressionPostAgg("p0", "substring(\"d0\", 1, -1)"),
expressionPostAgg("p1", "strlen(\"d0\")")
))
)
.metric(new NumericTopNMetricSpec("p1"))
.threshold(10)
.context(QUERY_CONTEXT_DEFAULT)
@ -2542,15 +2540,11 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim1", "d0"))
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("p0")))
.aggregators(aggregators(
.aggregators(
new FloatMinAggregatorFactory("a0", "m1"),
new FloatMaxAggregatorFactory("a1", "m1")
))
.postAggregators(
ImmutableList.of(
expressionPostAgg("p0", "(\"a0\" + \"a1\")")
)
)
.postAggregators(expressionPostAgg("p0", "(\"a0\" + \"a1\")"))
.threshold(3)
.context(QUERY_CONTEXT_DEFAULT)
.build()
@ -4650,7 +4644,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim2", "d0"))
.aggregators(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.aggregators(new LongSumAggregatorFactory("a0", "cnt"))
.metric(new NumericTopNMetricSpec("a0"))
.threshold(2)
.context(QUERY_CONTEXT_DEFAULT)
@ -4717,7 +4711,7 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.intervals(querySegmentSpec(Filtration.eternity()))
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("dim2", "d0"))
.aggregators(aggregators(new LongSumAggregatorFactory("a0", "cnt")))
.aggregators(new LongSumAggregatorFactory("a0", "cnt"))
.metric(new NumericTopNMetricSpec("a0"))
.threshold(2)
.context(QUERY_CONTEXT_DEFAULT)
@ -7582,14 +7576,13 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
.granularity(Granularities.ALL)
.dimension(new DefaultDimensionSpec("m1", "d0", ValueType.FLOAT))
.filters("dim2", "a")
.aggregators(aggregators(
.aggregators(
new DoubleSumAggregatorFactory("a0:sum", "m2"),
new CountAggregatorFactory("a0:count"),
new DoubleSumAggregatorFactory("a1", "m1"),
new DoubleSumAggregatorFactory("a2", "m2")
))
)
.postAggregators(
ImmutableList.of(
new ArithmeticPostAggregator(
"a0",
"quotient",
@ -7600,7 +7593,6 @@ public class CalciteQueryTest extends BaseCalciteQueryTest
),
expressionPostAgg("p0", "(\"a1\" + \"a2\")")
)
)
.metric(new DimensionTopNMetricSpec(null, StringComparators.NUMERIC))
.threshold(5)
.context(QUERY_CONTEXT_DEFAULT)