parallel broker merges on fork join pool (#8578)

* sketch of broker parallel merges done in small batches on fork join pool

* fix non-terminating sequences, auto compute parallelism

* adjust benches

* adjust benchmarks

* now hella more faster, fixed dumb

* fix

* remove comments

* log.info for debug

* javadoc

* safer block for sequence to yielder conversion

* refactor LifecycleForkJoinPool into LifecycleForkJoinPoolProvider which wraps a ForkJoinPool

* smooth yield rate adjustment, more logs to help tune

* cleanup, less logs

* error handling, bug fixes, on by default, more parallel, more tests

* remove unused var

* comments

* timeboundary mergeFn

* simplify, more javadoc

* formatting

* pushdown config

* use nanos consistently, move logs back to debug level, bit more javadoc

* static terminal result batch

* javadoc for nullability of createMergeFn

* cleanup

* oops

* fix race, add docs

* spelling, remove todo, add unhandled exception log

* cleanup, revert unintended change

* another unintended change

* review stuff

* add ParallelMergeCombiningSequenceBenchmark, fixes

* hyper-threading is the enemy

* fix initial start delay, lol

* parallelism computer now balances partition sizes to partition counts using sqrt of sequence count instead of sequence count by 2

* fix those important style issues with the benchmarks code

* lazy sequence creation for benchmarks

* more benchmark comments

* stable sequence generation time

* update defaults to use 100ms target time, 4096 batch size, 16384 initial yield, also update user docs

* add jmh thread based benchmarks, cleanup some stuff

* oops

* style

* add spread to jmh thread benchmark start range, more comments to benchmarks parameters and purpose

* retool benchmark to allow modeling more typical heterogenous heavy workloads

* spelling

* fix

* refactor benchmarks

* formatting

* docs

* add maxThreadStartDelay parameter to threaded benchmark

* why does catch need to be on its own line but else doesnt
This commit is contained in:
Clint Wylie 2019-11-07 11:58:46 -08:00 committed by Jihoon Son
parent a9aa416c3d
commit 7aafcf8bca
27 changed files with 2885 additions and 51 deletions

View File

@ -81,6 +81,12 @@
<artifactId>druid-histogram</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>

View File

@ -47,7 +47,6 @@ import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.data.input.Row;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -66,6 +65,7 @@ import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -84,6 +84,7 @@ import org.apache.druid.query.groupby.GroupByQueryEngine;
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
@ -134,12 +135,13 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
@Warmup(iterations = 15)
@Measurement(iterations = 30)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
public class CachingClusteredClientBenchmark
{
private static final Logger LOG = new Logger(CachingClusteredClientBenchmark.class);
@ -147,22 +149,26 @@ public class CachingClusteredClientBenchmark
private static final String DATA_SOURCE = "ds";
public static final ObjectMapper JSON_MAPPER;
@Param({"8"})
@Param({"8", "24"})
private int numServers;
@Param({"4", "2", "1"})
private int numProcessingThreads;
@Param({"0", "1", "4"})
private int parallelism;
@Param({"75000"})
private int rowsPerSegment;
@Param({"all"})
@Param({"all", "minute"})
private String queryGranularity;
private QueryToolChestWarehouse toolChestWarehouse;
private QueryRunnerFactoryConglomerate conglomerate;
private CachingClusteredClient cachingClusteredClient;
private ExecutorService processingPool;
private ForkJoinPool forkJoinPool;
private boolean parallelCombine;
private Query query;
@ -173,6 +179,8 @@ public class CachingClusteredClientBenchmark
Collections.singletonList(basicSchema.getDataInterval())
);
private final int numProcessingThreads = 4;
static {
JSON_MAPPER = new DefaultObjectMapper();
JSON_MAPPER.setInjectableValues(
@ -188,6 +196,8 @@ public class CachingClusteredClientBenchmark
{
final String schemaName = "basic";
parallelCombine = parallelism > 0;
BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
Map<DataSegment, QueryableIndex> queryableIndexes = new HashMap<>(numServers);
@ -232,6 +242,12 @@ public class CachingClusteredClientBenchmark
{
return numProcessingThreads;
}
@Override
public boolean useParallelMergePool()
{
return true;
}
};
conglomerate = new DefaultQueryRunnerFactoryConglomerate(
@ -298,6 +314,12 @@ public class CachingClusteredClientBenchmark
}
processingPool = Execs.multiThreaded(processingConfig.getNumThreads(), "caching-clustered-client-benchmark");
forkJoinPool = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null,
true
);
cachingClusteredClient = new CachingClusteredClient(
toolChestWarehouse,
serverView,
@ -305,7 +327,9 @@ public class CachingClusteredClientBenchmark
JSON_MAPPER,
new ForegroundCachePopulator(JSON_MAPPER, new CachePopulatorStats(), 0),
new CacheConfig(),
new DruidHttpClientConfig()
new DruidHttpClientConfig(),
processingConfig,
forkJoinPool
);
}
@ -360,6 +384,7 @@ public class CachingClusteredClientBenchmark
{
closer.close();
processingPool.shutdown();
forkJoinPool.shutdownNow();
}
@Benchmark
@ -372,6 +397,12 @@ public class CachingClusteredClientBenchmark
.intervals(basicSchemaIntervalSpec)
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();
final List<Result<TimeseriesResultValue>> results = runQuery();
@ -389,11 +420,17 @@ public class CachingClusteredClientBenchmark
query = new TopNQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(basicSchemaIntervalSpec)
.dimension(new DefaultDimensionSpec("dimUniform", null))
.dimension(new DefaultDimensionSpec("dimZipf", null))
.aggregators(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.granularity(Granularity.fromString(queryGranularity))
.metric("sumLongSequential")
.threshold(10_000) // we are primarily measuring 'broker' merge time, so collect a significant number of results
.context(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();
final List<Result<TopNResultValue>> results = runQuery();
@ -413,16 +450,22 @@ public class CachingClusteredClientBenchmark
.setDataSource(DATA_SOURCE)
.setQuerySegmentSpec(basicSchemaIntervalSpec)
.setDimensions(
new DefaultDimensionSpec("dimUniform", null),
new DefaultDimensionSpec("dimZipf", null)
new DefaultDimensionSpec("dimZipf", null),
new DefaultDimensionSpec("dimSequential", null)
)
.setAggregatorSpecs(new LongSumAggregatorFactory("sumLongSequential", "sumLongSequential"))
.setGranularity(Granularity.fromString(queryGranularity))
.setContext(
ImmutableMap.of(
QueryContexts.BROKER_PARALLEL_MERGE_KEY, parallelCombine,
QueryContexts.BROKER_PARALLELISM, parallelism
)
)
.build();
final List<Row> results = runQuery();
final List<ResultRow> results = runQuery();
for (Row result : results) {
for (ResultRow result : results) {
blackhole.consume(result);
}
}

View File

@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.sequences;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.guava.MergeSequence;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.function.Supplier;
@State(Scope.Benchmark)
public class BaseParallelMergeCombiningSequenceBenchmark
{
private static final Logger log = new Logger(ParallelMergeCombiningSequenceBenchmark.class);
// default merge FJP size
static final ForkJoinPool MERGE_POOL = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
true
);
// note: parameters are broken down one per line to allow easily commenting out lines to mix and match which
// benchmarks to run
// also note: don't really run this like it is unless you have days to spare
@Param({
"8",
"16",
"32",
"64"
})
int numSequences;
/**
* Strategy encodes the type of sequence and configuration parameters for that sequence.
*
* Strategies of the form: 'parallelism-{parallelism}-{targetTime}ms-{batchSize}-{yieldAfter}'
* encode the parameters for a {@link ParallelMergeCombiningSequence}.
*
* A strategy of: 'combiningMergeSequence-same-thread' (or an unrecognized value) will use a
* {@link CombiningSequence} that wraps a {@link MergeSequence}
*/
@Param({
"combiningMergeSequence-same-thread",
"parallelism-1-10ms-256-1024",
"parallelism-4-10ms-256-1024",
"parallelism-8-10ms-256-1024",
"parallelism-16-10ms-256-1024",
"parallelism-1-100ms-1024-4096",
"parallelism-4-100ms-1024-4096",
"parallelism-8-100ms-1024-4096",
"parallelism-16-100ms-1024-4096",
"parallelism-1-100ms-4096-16384",
"parallelism-4-100ms-4096-16384",
"parallelism-8-100ms-4096-16384",
"parallelism-16-100ms-4096-16384"
})
String strategy;
private int parallelism;
private int targetTaskTimeMillis;
private int batchSize;
private int yieldAfter;
private Function<List<Sequence<ParallelMergeCombiningSequenceTest.IntPair>>, Sequence<ParallelMergeCombiningSequenceTest.IntPair>> outputSequenceFactory;
@Setup(Level.Trial)
public void setup()
{
setupOutputSequence();
}
void setupOutputSequence()
{
String[] strategySplit = strategy.split("-");
if ("parallelism".equals(strategySplit[0])) {
// "parallelism-{parallelism}-{targetTime}ms-{batchSize}-{yieldAfter}"
parallelism = Integer.parseInt(strategySplit[1]);
targetTaskTimeMillis = Integer.parseInt(strategySplit[2].substring(0, strategySplit[2].length() - 2));
batchSize = Integer.parseInt(strategySplit[3]);
yieldAfter = Integer.parseInt(strategySplit[4]);
outputSequenceFactory = this::createParallelSequence;
} else {
outputSequenceFactory = this::createCombiningMergeSequence;
}
}
Sequence<ParallelMergeCombiningSequenceTest.IntPair> createParallelSequence(
List<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> inputSequences
)
{
return new ParallelMergeCombiningSequence<>(
MERGE_POOL,
inputSequences,
ParallelMergeCombiningSequenceTest.INT_PAIR_ORDERING,
ParallelMergeCombiningSequenceTest.INT_PAIR_MERGE_FN,
false,
0,
0,
parallelism,
yieldAfter,
batchSize,
targetTaskTimeMillis
);
}
Sequence<ParallelMergeCombiningSequenceTest.IntPair> createCombiningMergeSequence(
List<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> inputSequences
)
{
return CombiningSequence.create(
new MergeSequence<>(
ParallelMergeCombiningSequenceTest.INT_PAIR_ORDERING,
Sequences.simple(inputSequences)
),
ParallelMergeCombiningSequenceTest.INT_PAIR_ORDERING,
ParallelMergeCombiningSequenceTest.INT_PAIR_MERGE_FN
);
}
void consumeSequence(Blackhole blackhole, Supplier<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> supplier)
{
try {
Yielder<ParallelMergeCombiningSequenceTest.IntPair> yielder =
Yielders.each(outputSequenceFactory.apply(createInputSequences(supplier)));
ParallelMergeCombiningSequenceTest.IntPair prev;
while (!yielder.isDone()) {
prev = yielder.get();
blackhole.consume(prev);
yielder = yielder.next(yielder.get());
}
}
catch (Exception anyException) {
log.error(anyException, "benchmark failed");
throw new RuntimeException(anyException);
}
}
List<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> createInputSequences(
Supplier<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> supplier
)
{
List<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> inputSequences = new ArrayList<>(numSequences);
for (int j = 0; j < numSequences; j++) {
inputSequences.add(supplier.get());
}
return inputSequences;
}
Sequence<ParallelMergeCombiningSequenceTest.IntPair> generateSmallSequence()
{
return ParallelMergeCombiningSequenceTest.blockingSequence(
ThreadLocalRandom.current().nextInt(500, 10000),
50,
200,
-1,
0,
true
);
}
Sequence<ParallelMergeCombiningSequenceTest.IntPair> generateModeratelyLargeSequence()
{
return ParallelMergeCombiningSequenceTest.blockingSequence(
ThreadLocalRandom.current().nextInt(50_000, 75_000),
1000,
2500,
-1,
0,
true
);
}
Sequence<ParallelMergeCombiningSequenceTest.IntPair> generateLargeSequence()
{
final int numRows = ThreadLocalRandom.current().nextInt(1_500_000, 10_000_000);
final int frequency = numRows / 5;
return ParallelMergeCombiningSequenceTest.blockingSequence(
numRows,
5000,
10000,
frequency,
10,
true
);
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.apache.druid.benchmark;
package org.apache.druid.benchmark.sequences;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Ints;

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.sequences;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
@Warmup(iterations = 5)
@Measurement(iterations = 25)
public class ParallelMergeCombiningSequenceBenchmark extends BaseParallelMergeCombiningSequenceBenchmark
{
private static final Logger log = new Logger(ParallelMergeCombiningSequenceBenchmark.class);
// this should be as large as the largest value of concurrentSequenceConsumers
private static final ExecutorService CONSUMER_POOL = Execs.multiThreaded(64, "mock-http-thread");
/**
* Number of threads to run on {@link #CONSUMER_POOL}, each running {@link #consumeSequence}
*/
@Param({
"1",
"2",
"4",
"8",
"16",
"32",
"64"
})
private int concurrentSequenceConsumers;
/**
* Offset to start each thread of {@link #concurrentSequenceConsumers}
*/
@Param({
"0",
"10",
"100",
"500",
"1000"
})
private int concurrentConsumerDelayMillis;
/**
* This encodes the type of input sequences and parameters that control their behavior.
* 'non-blocking-sequence-{numRows}' uses {@link ParallelMergeCombiningSequenceTest#nonBlockingSequence} to as you
* might expect create an input sequence that is lazily generated and will not block while being consumed.
*
* 'initially-blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms' uses
* {@link ParallelMergeCombiningSequenceTest#blockingSequence} to create a lazily generated input sequence that will
* initially block for a random time within the range specified in the parameter, and will not perform any additional
* blocking during further processing.
*
* 'blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms-{numberOfTimesToBlock}-{frequencyDelay}ms' uses
* {@link ParallelMergeCombiningSequenceTest#blockingSequence} to create a lazily generated input sequence that will
* initially block for a random time within the range specified in the parameter, and additionally will randomly block
* up to the number of occurrences for up to the delay encoded in the parameter.
*
* 'typical-distribution-sequence' will randomly produce a 'class' of input sequences at the following rates:
* - 80% probability of a small result set which has a short initial delay on the order of tens to hundreds of millis
* and input row counts of up to a few thousand
* - 20% probability produce a moderately large result set which has an initial delay in the range of a few seconds
* and input sequence row counts in the 50k-75k range
* This input sequence is only useful when testing a large number of concurrent threads
*
* note: beware when using the blocking sequences for a direct comparison between strategies
* at minimum they are useful for gauging behavior when sequences block, but because the sequences are not stable
* between strategies or number of sequences, much less between iterations of the same strategy, compensation in
* the form of running a lot of iterations could potentially make them more directly comparable
*/
@Param({
"non-blocking-sequence-1000",
"non-blocking-sequence-75000",
"non-blocking-sequence-10000000",
"initially-blocking-sequence-1000-100-500ms",
"initially-blocking-sequence-75000-100-500ms",
"initially-blocking-sequence-10000000-100-500ms",
"initially-blocking-sequence-1000-4000-5000ms",
"initially-blocking-sequence-75000-4000-5000ms",
"initially-blocking-sequence-10000000-4000-5000ms",
"blocking-sequence-1000-10-500ms-10-1ms",
"blocking-sequence-75000-10-500ms-10-1ms",
"blocking-sequence-10000000-10-500ms-10-1ms",
"typical-distribution-sequence"
})
String inputSequenceType;
private Supplier<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> homogeneousInputSequenceFactory;
private Function<Double, Sequence<ParallelMergeCombiningSequenceTest.IntPair>> heterogeneousInputSequenceFactory;
@Setup(Level.Trial)
public void setupInputSequenceGenerator()
{
String[] inputSequenceTypeSplit = inputSequenceType.split("-");
if ("initially".equals(inputSequenceTypeSplit[0])) {
// e.g. "initially-blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms"
final int numRows = Integer.parseInt(inputSequenceTypeSplit[3]);
final int startDelayStartMillis = Integer.parseInt(inputSequenceTypeSplit[4]);
final int startDelayEndMillis = Integer.parseInt(
inputSequenceTypeSplit[5].substring(0, inputSequenceTypeSplit[5].length() - 2)
);
homogeneousInputSequenceFactory = () ->
ParallelMergeCombiningSequenceTest.blockingSequence(
numRows,
startDelayStartMillis,
startDelayEndMillis,
-1,
0,
true
);
} else if ("blocking".equals(inputSequenceTypeSplit[0])) {
// e.g. "blocking-sequence-{numRows}-{startDelayStart}-{startDelayEnd}ms-{numberOfTimesToBlock}-{frequencyDelay}ms"
final int numRows = Integer.parseInt(inputSequenceTypeSplit[2]);
final int startDelayStartMillis = Integer.parseInt(inputSequenceTypeSplit[3]);
final int startDelayEndMillis = Integer.parseInt(
inputSequenceTypeSplit[4].substring(0, inputSequenceTypeSplit[4].length() - 2)
);
final int numberOfTimesToBlock = Integer.parseInt(inputSequenceTypeSplit[5]);
final int maxIterationDelayMillis = Integer.parseInt(
inputSequenceTypeSplit[6].substring(0, inputSequenceTypeSplit[6].length() - 2)
);
final int frequency = numRows / numberOfTimesToBlock;
homogeneousInputSequenceFactory = () ->
ParallelMergeCombiningSequenceTest.blockingSequence(
numRows,
startDelayStartMillis,
startDelayEndMillis,
frequency,
maxIterationDelayMillis,
true
);
} else if ("non".equals(inputSequenceTypeSplit[0])) {
// e.g. "non-blocking-sequence-{numRows}"
final int numRows = Integer.parseInt(inputSequenceTypeSplit[3]);
homogeneousInputSequenceFactory = () ->
ParallelMergeCombiningSequenceTest.nonBlockingSequence(numRows, true);
} else { // "typical distribution" input sequence
// approximately 80% of threads will merge/combine small result sets between 500-10k results per input sequence
// blocking for 50-200 ms before initial results are yielded
// approximately 20% of threads will merge/combine moderate sized result sets between 50k-75k per input
// sequence, blocking for 1000-2500 ms before initial results are yielded
heterogeneousInputSequenceFactory = (d) -> {
if (d < 0.80) { // small queries
return generateSmallSequence();
} else { // moderately large queries
return generateModeratelyLargeSequence();
}
};
}
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void exec(Blackhole blackhole) throws Exception
{
List<Future> futures = createConsumers(blackhole, concurrentSequenceConsumers, concurrentConsumerDelayMillis);
for (int i = 0; i < concurrentSequenceConsumers; i++) {
blackhole.consume(futures.get(i).get());
}
blackhole.consume(futures);
}
private List<Future> createConsumers(Blackhole blackhole, int consumers, int delayMillis) throws Exception
{
List<Future> futures = new ArrayList<>(consumers);
for (int i = 0; i < consumers; i++) {
if (delayMillis > 0) {
Thread.sleep(delayMillis);
}
if (heterogeneousInputSequenceFactory != null) {
double d = ThreadLocalRandom.current().nextDouble(0.0, 1.0);
futures.add(
CONSUMER_POOL.submit(() -> consumeSequence(blackhole, () -> heterogeneousInputSequenceFactory.apply(d)))
);
} else {
futures.add(CONSUMER_POOL.submit(() -> consumeSequence(blackhole, homogeneousInputSequenceFactory)));
}
}
return futures;
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(ParallelMergeCombiningSequenceBenchmark.class.getSimpleName())
.forks(1)
.syncIterations(true)
.resultFormat(ResultFormatType.CSV)
.result("parallel-merge-combining-sequence.csv")
.build();
new Runner(opt).run();
}
}

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark.sequences;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequenceTest;
import org.apache.druid.java.util.common.guava.Sequence;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@State(Scope.Benchmark)
@Fork(value = 1, jvmArgsAppend = "-XX:+UseG1GC")
@Warmup(iterations = 5)
@Measurement(iterations = 25)
public class ParallelMergeCombiningSequenceThreadedBenchmark extends BaseParallelMergeCombiningSequenceBenchmark
{
@Param({
"0",
"100",
"500"
})
int maxThreadStartDelay;
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Group("consumers")
@GroupThreads(4)
public void consumeSmall(Blackhole blackhole)
{
consumeSequence(blackhole, this::generateSmallSequence);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@Group("consumers")
@GroupThreads(1)
public void consumeModeratelyLarge(Blackhole blackhole)
{
consumeSequence(blackhole, this::generateModeratelyLargeSequence);
}
@Override
void consumeSequence(Blackhole blackhole, Supplier<Sequence<ParallelMergeCombiningSequenceTest.IntPair>> supplier)
{
int delay = maxThreadStartDelay > 0 ? ThreadLocalRandom.current().nextInt(0, maxThreadStartDelay) : 0;
if (delay > 0) {
try {
Thread.sleep(delay);
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
super.consumeSequence(blackhole, supplier);
}
}

View File

@ -32,4 +32,9 @@ public class RE extends RuntimeException
{
super(StringUtils.nonStrictFormat(formatText, arguments), cause);
}
public RE(Throwable cause)
{
super(cause == null ? null : cause.getMessage(), cause);
}
}

View File

@ -0,0 +1,800 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common.guava;
import com.google.common.collect.Ordering;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BinaryOperator;
public class ParallelMergeCombiningSequenceTest
{
private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class);
public static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs);
public static final BinaryOperator<IntPair> INT_PAIR_MERGE_FN = (lhs, rhs) -> {
if (lhs == null) {
return rhs;
}
if (rhs == null) {
return lhs;
}
return new IntPair(lhs.lhs, lhs.rhs + rhs.rhs);
};
private ForkJoinPool pool;
@Before
public void setup()
{
pool = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
true
);
}
@After
public void teardown()
{
pool.shutdown();
}
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testOrderedResultBatchFromSequence() throws IOException
{
Sequence<IntPair> rawSequence = nonBlockingSequence(5000);
ParallelMergeCombiningSequence.YielderBatchedResultsCursor<IntPair> cursor =
new ParallelMergeCombiningSequence.YielderBatchedResultsCursor<>(
new ParallelMergeCombiningSequence.SequenceBatcher<>(rawSequence, 128),
INT_PAIR_ORDERING
);
cursor.initialize();
Yielder<IntPair> rawYielder = Yielders.each(rawSequence);
IntPair prev = null;
while (!rawYielder.isDone() && !cursor.isDone()) {
Assert.assertEquals(rawYielder.get(), cursor.get());
Assert.assertNotEquals(cursor.get(), prev);
prev = cursor.get();
rawYielder = rawYielder.next(rawYielder.get());
cursor.advance();
}
cursor.close();
rawYielder.close();
}
@Test
public void testOrderedResultBatchFromSequenceBackToYielderOnSequence() throws IOException
{
final int batchSize = 128;
final int sequenceSize = 5_000;
Sequence<IntPair> rawSequence = nonBlockingSequence(sequenceSize);
ParallelMergeCombiningSequence.YielderBatchedResultsCursor<IntPair> cursor =
new ParallelMergeCombiningSequence.YielderBatchedResultsCursor<>(
new ParallelMergeCombiningSequence.SequenceBatcher<>(rawSequence, 128),
INT_PAIR_ORDERING
);
cursor.initialize();
Yielder<IntPair> rawYielder = Yielders.each(rawSequence);
ArrayBlockingQueue<ParallelMergeCombiningSequence.ResultBatch<IntPair>> outputQueue =
new ArrayBlockingQueue<>((int) Math.ceil(((double) sequenceSize / batchSize) + 2));
IntPair prev = null;
ParallelMergeCombiningSequence.ResultBatch<IntPair> currentBatch =
new ParallelMergeCombiningSequence.ResultBatch<>(batchSize);
int batchCounter = 0;
while (!rawYielder.isDone() && !cursor.isDone()) {
Assert.assertEquals(rawYielder.get(), cursor.get());
Assert.assertNotEquals(cursor.get(), prev);
prev = cursor.get();
currentBatch.add(prev);
batchCounter++;
if (batchCounter >= batchSize) {
outputQueue.offer(currentBatch);
currentBatch = new ParallelMergeCombiningSequence.ResultBatch<>(batchSize);
batchCounter = 0;
}
rawYielder = rawYielder.next(rawYielder.get());
cursor.advance();
}
if (!currentBatch.isDrained()) {
outputQueue.offer(currentBatch);
}
outputQueue.offer(ParallelMergeCombiningSequence.ResultBatch.TERMINAL);
rawYielder.close();
cursor.close();
rawYielder = Yielders.each(rawSequence);
Sequence<IntPair> queueAsSequence = ParallelMergeCombiningSequence.makeOutputSequenceForQueue(
outputQueue,
true,
System.nanoTime() + TimeUnit.NANOSECONDS.convert(10_000, TimeUnit.MILLISECONDS),
new ParallelMergeCombiningSequence.CancellationGizmo()
);
Yielder<IntPair> queueYielder = Yielders.each(queueAsSequence);
while (!rawYielder.isDone() && !queueYielder.isDone()) {
Assert.assertEquals(rawYielder.get(), queueYielder.get());
Assert.assertNotEquals(queueYielder.get(), prev);
prev = queueYielder.get();
rawYielder = rawYielder.next(rawYielder.get());
queueYielder = queueYielder.next(queueYielder.get());
}
rawYielder.close();
queueYielder.close();
}
@Test
public void testOrderedResultBatchFromSequenceToBlockingQueueCursor() throws IOException
{
final int batchSize = 128;
final int sequenceSize = 5_000;
Sequence<IntPair> rawSequence = nonBlockingSequence(sequenceSize);
ParallelMergeCombiningSequence.YielderBatchedResultsCursor<IntPair> cursor =
new ParallelMergeCombiningSequence.YielderBatchedResultsCursor<>(
new ParallelMergeCombiningSequence.SequenceBatcher<>(rawSequence, 128),
INT_PAIR_ORDERING
);
cursor.initialize();
Yielder<IntPair> rawYielder = Yielders.each(rawSequence);
ArrayBlockingQueue<ParallelMergeCombiningSequence.ResultBatch<IntPair>> outputQueue =
new ArrayBlockingQueue<>((int) Math.ceil(((double) sequenceSize / batchSize) + 2));
IntPair prev = null;
ParallelMergeCombiningSequence.ResultBatch<IntPair> currentBatch =
new ParallelMergeCombiningSequence.ResultBatch<>(batchSize);
int batchCounter = 0;
while (!rawYielder.isDone() && !cursor.isDone()) {
Assert.assertEquals(rawYielder.get(), cursor.get());
Assert.assertNotEquals(cursor.get(), prev);
prev = cursor.get();
currentBatch.add(prev);
batchCounter++;
if (batchCounter >= batchSize) {
outputQueue.offer(currentBatch);
currentBatch = new ParallelMergeCombiningSequence.ResultBatch<>(batchSize);
batchCounter = 0;
}
rawYielder = rawYielder.next(rawYielder.get());
cursor.advance();
}
if (!currentBatch.isDrained()) {
outputQueue.offer(currentBatch);
}
outputQueue.offer(ParallelMergeCombiningSequence.ResultBatch.TERMINAL);
rawYielder.close();
cursor.close();
rawYielder = Yielders.each(rawSequence);
ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor<IntPair> queueCursor =
new ParallelMergeCombiningSequence.BlockingQueueuBatchedResultsCursor<>(
outputQueue,
INT_PAIR_ORDERING,
false,
-1L
);
queueCursor.initialize();
prev = null;
while (!rawYielder.isDone() && !queueCursor.isDone()) {
Assert.assertEquals(rawYielder.get(), queueCursor.get());
Assert.assertNotEquals(queueCursor.get(), prev);
prev = queueCursor.get();
rawYielder = rawYielder.next(rawYielder.get());
queueCursor.advance();
}
rawYielder.close();
queueCursor.close();
}
@Test
public void testNone() throws Exception
{
List<Sequence<IntPair>> input = new ArrayList<>();
assertResult(input);
}
@Test
public void testEmpties() throws Exception
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(Sequences.empty());
input.add(Sequences.empty());
assertResult(input);
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
assertResult(input);
}
@Test
public void testEmptiesAndNonEmpty() throws Exception
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(Sequences.empty());
input.add(nonBlockingSequence(5));
assertResult(input);
input.clear();
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(nonBlockingSequence(5));
assertResult(input);
}
@Test
public void testAllInSingleBatch() throws Exception
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
assertResult(input, 10, 20);
input.clear();
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(4));
input.add(nonBlockingSequence(6));
assertResult(input, 10, 20);
}
@Test
public void testAllInSingleYield() throws Exception
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
assertResult(input, 4, 20);
input.clear();
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(4));
input.add(nonBlockingSequence(6));
assertResult(input, 4, 20);
}
@Test
public void testMultiBatchMultiYield() throws Exception
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(15));
input.add(nonBlockingSequence(26));
assertResult(input, 5, 10);
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(nonBlockingSequence(15));
input.add(nonBlockingSequence(33));
input.add(nonBlockingSequence(17));
input.add(nonBlockingSequence(14));
assertResult(input, 5, 10);
}
@Test
public void testMixedSingleAndMultiYield() throws Exception
{
// below min threshold, so will merge serially
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(60));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
assertResult(input, 5, 10);
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(nonBlockingSequence(1));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(32));
assertResult(input, 5, 10);
}
@Test
public void testLongerSequencesJustForFun() throws Exception
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(10_000));
input.add(nonBlockingSequence(9_001));
assertResult(input, 128, 1024);
input.add(nonBlockingSequence(7_777));
input.add(nonBlockingSequence(8_500));
input.add(nonBlockingSequence(5_000));
input.add(nonBlockingSequence(8_888));
assertResult(input, 128, 1024);
}
@Test
public void testExceptionOnInputSequenceRead() throws Exception
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(explodingSequence(15));
input.add(nonBlockingSequence(25));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(25));
input.add(explodingSequence(11));
input.add(nonBlockingSequence(12));
assertException(input);
}
@Test
public void testExceptionFirstResultFromSequence() throws Exception
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(explodingSequence(0));
input.add(nonBlockingSequence(2));
input.add(nonBlockingSequence(2));
input.add(nonBlockingSequence(2));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
}
@Test
public void testExceptionFirstResultFromMultipleSequence() throws Exception
{
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(explodingSequence(0));
input.add(explodingSequence(0));
input.add(explodingSequence(0));
input.add(nonBlockingSequence(2));
input.add(nonBlockingSequence(2));
input.add(nonBlockingSequence(2));
expectedException.expect(RuntimeException.class);
expectedException.expectMessage(
"exploded"
);
assertException(input);
}
@Test
public void testTimeoutExceptionDueToStalledInput() throws Exception
{
final int someSize = 2048;
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(blockingSequence(someSize, 400, 500, 1, 500, true));
expectedException.expect(RuntimeException.class);
expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
expectedException.expectMessage("Sequence iterator timed out waiting for data");
assertException(
input,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS,
1000L,
0
);
}
@Test
public void testTimeoutExceptionDueToStalledReader() throws Exception
{
final int someSize = 2048;
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
expectedException.expect(RuntimeException.class);
expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
expectedException.expectMessage("Sequence iterator timed out");
assertException(input, 8, 64, 1000, 500);
}
private void assertResult(List<Sequence<IntPair>> sequences) throws InterruptedException, IOException
{
assertResult(
sequences,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS
);
}
private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter)
throws InterruptedException, IOException
{
final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create(
new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)),
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN
);
final ParallelMergeCombiningSequence<IntPair> parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>(
pool,
sequences,
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN,
true,
5000,
0,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5),
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS
);
Yielder<IntPair> combiningYielder = Yielders.each(combiningSequence);
Yielder<IntPair> parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence);
IntPair prev = null;
while (!combiningYielder.isDone() && !parallelMergeCombineYielder.isDone()) {
Assert.assertEquals(combiningYielder.get(), parallelMergeCombineYielder.get());
Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev);
prev = parallelMergeCombineYielder.get();
combiningYielder = combiningYielder.next(combiningYielder.get());
parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
}
Assert.assertTrue(combiningYielder.isDone());
Assert.assertTrue(parallelMergeCombineYielder.isDone());
while (pool.getRunningThreadCount() > 0) {
Thread.sleep(100);
}
Assert.assertEquals(0, pool.getRunningThreadCount());
combiningYielder.close();
parallelMergeCombineYielder.close();
}
private void assertException(List<Sequence<IntPair>> sequences) throws Exception
{
assertException(
sequences,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS,
5000L,
0
);
}
private void assertException(
List<Sequence<IntPair>> sequences,
int batchSize,
int yieldAfter,
long timeout,
int readDelayMillis
)
throws Exception
{
try {
final ParallelMergeCombiningSequence<IntPair> parallelMergeCombineSequence = new ParallelMergeCombiningSequence<>(
pool,
sequences,
INT_PAIR_ORDERING,
INT_PAIR_MERGE_FN,
true,
timeout,
0,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5),
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS
);
Yielder<IntPair> parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence);
IntPair prev = null;
while (!parallelMergeCombineYielder.isDone()) {
Assert.assertNotEquals(parallelMergeCombineYielder.get(), prev);
prev = parallelMergeCombineYielder.get();
if (readDelayMillis > 0 && ThreadLocalRandom.current().nextBoolean()) {
Thread.sleep(readDelayMillis);
}
parallelMergeCombineYielder = parallelMergeCombineYielder.next(parallelMergeCombineYielder.get());
}
parallelMergeCombineYielder.close();
}
catch (Exception ex) {
LOG.warn(ex, "exception:");
throw ex;
}
}
public static class IntPair extends Pair<Integer, Integer>
{
private IntPair(Integer lhs, Integer rhs)
{
super(lhs, rhs);
}
}
/**
* Generate an ordered, random valued, non-blocking sequence of {@link IntPair}, optionally lazy generated with
* the implication that every time a sequence is accumulated or yielded it produces <b>different</b> results,
* which sort of breaks the {@link Sequence} contract, and makes this method useless for tests in lazy mode,
* however it is useful for benchmarking, where having a sequence without having to materialize the entire thing
* up front on heap with a {@link List} backing is preferable.
*/
public static Sequence<IntPair> nonBlockingSequence(int size, boolean lazyGenerate)
{
List<IntPair> pairs = lazyGenerate ? null : generateOrderedPairs(size);
return new BaseSequence<>(
new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>()
{
@Override
public Iterator<IntPair> make()
{
return new Iterator<IntPair>()
{
int mergeKey = 0;
int rowCounter = 0;
@Override
public boolean hasNext()
{
return rowCounter < size;
}
@Override
public IntPair next()
{
if (lazyGenerate) {
rowCounter++;
mergeKey += incrementMergeKeyAmount();
return makeIntPair(mergeKey);
} else {
return pairs.get(rowCounter++);
}
}
};
}
@Override
public void cleanup(Iterator<IntPair> iterFromMake)
{
// nothing to cleanup
}
}
);
}
/**
* Generate an ordered, random valued, blocking sequence of {@link IntPair}, optionally lazy generated. See
* {@link ParallelMergeCombiningSequenceTest#nonBlockingSequence(int)} for the implications of lazy generating a
* sequence, to summarize each time the sequence is accumulated or yielded it produces different results.
*
* This sequence simulates blocking using {@link Thread#sleep(long)}, with an initial millisecond delay range defined
* by {@param startDelayStartMillis} and {@param startDelayEndMillis} that defines how long to block before the first
* sequence value will be produced, and {@param maxIterationDelayMillis} that defines how long to block every
* {@param iterationDelayFrequency} rows.
*/
public static Sequence<IntPair> blockingSequence(
int size,
int startDelayStartMillis,
int startDelayEndMillis,
int iterationDelayFrequency,
int maxIterationDelayMillis,
boolean lazyGenerate
)
{
final List<IntPair> pairs = lazyGenerate ? null : generateOrderedPairs(size);
final long startDelayMillis = ThreadLocalRandom.current().nextLong(startDelayStartMillis, startDelayEndMillis);
final long delayUntil = System.nanoTime() + TimeUnit.NANOSECONDS.convert(startDelayMillis, TimeUnit.MILLISECONDS);
return new BaseSequence<>(
new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>()
{
@Override
public Iterator<IntPair> make()
{
return new Iterator<IntPair>()
{
int mergeKey = 0;
int rowCounter = 0;
@Override
public boolean hasNext()
{
return rowCounter < size;
}
@Override
public IntPair next()
{
try {
final long currentNano = System.nanoTime();
if (rowCounter == 0 && currentNano < delayUntil) {
final long sleepMillis = Math.max(
TimeUnit.MILLISECONDS.convert(delayUntil - currentNano, TimeUnit.NANOSECONDS),
1
);
Thread.sleep(sleepMillis);
} else if (maxIterationDelayMillis > 0
&& rowCounter % iterationDelayFrequency == 0
&& ThreadLocalRandom.current().nextBoolean()) {
final int delayMillis = Math.max(ThreadLocalRandom.current().nextInt(maxIterationDelayMillis), 1);
Thread.sleep(delayMillis);
}
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
if (lazyGenerate) {
rowCounter++;
mergeKey += incrementMergeKeyAmount();
return makeIntPair(mergeKey);
} else {
return pairs.get(rowCounter++);
}
}
};
}
@Override
public void cleanup(Iterator<IntPair> iterFromMake)
{
// nothing to cleanup
}
}
);
}
/**
* Genenerate non-blocking sequence for tests, non-lazy so the sequence produces consistent results
*/
private static Sequence<IntPair> nonBlockingSequence(int size)
{
return nonBlockingSequence(size, false);
}
/**
* Genenerate a sequence that explodes after {@param explodeAfter} rows
*/
private static Sequence<IntPair> explodingSequence(int explodeAfter)
{
final int explodeAt = explodeAfter + 1;
return new BaseSequence<>(
new BaseSequence.IteratorMaker<IntPair, Iterator<IntPair>>()
{
@Override
public Iterator<IntPair> make()
{
return new Iterator<IntPair>()
{
int mergeKey = 0;
int rowCounter = 0;
@Override
public boolean hasNext()
{
return rowCounter < explodeAt;
}
@Override
public IntPair next()
{
if (rowCounter == explodeAfter) {
throw new RuntimeException("exploded");
}
mergeKey += incrementMergeKeyAmount();
rowCounter++;
return makeIntPair(mergeKey);
}
};
}
@Override
public void cleanup(Iterator<IntPair> iterFromMake)
{
// nothing to cleanup
}
}
);
}
private static List<IntPair> generateOrderedPairs(int length)
{
int rowCounter = 0;
int mergeKey = 0;
List<IntPair> generatedSequence = new ArrayList<>(length);
while (rowCounter < length) {
mergeKey += incrementMergeKeyAmount();
generatedSequence.add(makeIntPair(mergeKey));
rowCounter++;
}
return generatedSequence;
}
private static int incrementMergeKeyAmount()
{
return ThreadLocalRandom.current().nextInt(1, 3);
}
private static IntPair makeIntPair(int mergeKey)
{
return new IntPair(mergeKey, ThreadLocalRandom.current().nextInt(1, 100));
}
}

View File

@ -1493,6 +1493,13 @@ The broker uses processing configs for nested groupBy queries. And, if you use g
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|`druid.processing.merge.useParallelMergePool`|Enable automatic parallel merging for Brokers on a dedicated async ForkJoinPool. If `false`, instead merges will be done serially on the `HTTP` thread pool.|`true`|
|`druid.processing.merge.pool.parallelism`|Size of ForkJoinPool. Note that the default configuration assumes that the value returned by `Runtime.getRuntime().availableProcessors()` represents 2 hyper-threads per physical core, and multiplies this value by `0.75` in attempt to size `1.5` times the number of _physical_ cores.|`Runtime.getRuntime().availableProcessors() * 0.75` (rounded up)|
|`druid.processing.merge.pool.defaultMaxQueryParallelism`|Default maximum number of parallel merge tasks per query. Note that the default configuration assumes that the value returned by `Runtime.getRuntime().availableProcessors()` represents 2 hyper-threads per physical core, and multiplies this value by `0.5` in attempt to size to the number of _physical_ cores.|`Runtime.getRuntime().availableProcessors() * 0.5` (rounded up)|
|`druid.processing.merge.pool.awaitShutdownMillis`|Time to wait for merge ForkJoinPool tasks to complete before ungracefully stopping on process shutdown in milliseconds.|`60_000`|
|`druid.processing.merge.task.targetRunTimeMillis`|Ideal run-time of each ForkJoinPool merge task, before forking off a new task to continue merging sequences.|`100`|
|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task, before forking off a new task to continue merging sequences.|`16384`|
|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks.|`4096`|
The amount of direct memory needed by Druid is at least
`druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)`. You can

View File

@ -41,6 +41,11 @@ The query context is used for various query configuration parameters. The follow
|maxQueuedBytes | `druid.broker.http.maxQueuedBytes` | Maximum number of bytes queued per query before exerting backpressure on the channel to the data server. Similar to `maxScatterGatherBytes`, except unlike that configuration, this one will trigger backpressure rather than query failure. Zero means disabled.|
|serializeDateTimeAsLong| `false` | If true, DateTime is serialized as long in the result returned by Broker and the data transportation between Broker and compute process|
|serializeDateTimeAsLongInner| `false` | If true, DateTime is serialized as long in the data transportation between Broker and compute process|
|enableParallelMerge|`true`|Enable parallel result merging on the Broker. Note that `druid.processing.merge.useParallelMergePool` must be enabled for this setting to be set to `true`. See [Broker configuration](../configuration/index.html#broker) for more details.|
|parallelMergeParallelism|`druid.processing.merge.pool.parallelism`|Maximum number of parallel threads to use for parallel result merging on the Broker. See [Broker configuration](../configuration/index.html#broker) for more details.|
|parallelMergeInitialYieldRows|`druid.processing.merge.task.initialYieldNumRows`|Number of rows to yield per ForkJoinPool merge task for parallel result merging on the Broker, before forking off a new task to continue merging sequences. See [Broker configuration](../configuration/index.html#broker) for more details.|
|parallelMergeSmallBatchRows|`druid.processing.merge.task.smallBatchNumRows`|Size of result batches to operate on in ForkJoinPool merge tasks for parallel result merging on the Broker. See [Broker configuration](../configuration/index.html#broker) for more details.|
In addition, some query types offer context parameters specific to that query type.

View File

@ -32,6 +32,9 @@ import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.query.context.ResponseContext;
import java.util.Comparator;
import java.util.function.BinaryOperator;
public class MaterializedViewQueryQueryToolChest extends QueryToolChest
{
private final QueryToolChestWarehouse warehouse;
@ -58,6 +61,20 @@ public class MaterializedViewQueryQueryToolChest extends QueryToolChest
};
}
@Override
public BinaryOperator createMergeFn(Query query)
{
final Query realQuery = getRealQuery(query);
return warehouse.getToolChest(realQuery).createMergeFn(realQuery);
}
@Override
public Comparator createResultComparator(Query query)
{
final Query realQuery = getRealQuery(query);
return warehouse.getToolChest(realQuery).createResultComparator(realQuery);
}
@Override
public QueryMetrics makeMetrics(Query query)
{

View File

@ -51,6 +51,7 @@ import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
@ -84,6 +85,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
/**
* Base class for implementing MovingAverageQuery tests
@ -349,7 +351,16 @@ public class MovingAverageQueryTest
{
return 0L;
}
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
},
ForkJoinPool.commonPool()
);
ClientQuerySegmentWalker walker = new ClientQuerySegmentWalker(

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class LifecycleForkJoinPoolProvider
{
private static final Logger LOG = new Logger(LifecycleForkJoinPoolProvider.class);
private final long awaitShutdownMillis;
private final ForkJoinPool pool;
public LifecycleForkJoinPoolProvider(
int parallelism,
ForkJoinPool.ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
boolean asyncMode,
long awaitShutdownMillis
)
{
this.pool = new ForkJoinPool(parallelism, factory, handler, asyncMode);
this.awaitShutdownMillis = awaitShutdownMillis;
}
@LifecycleStop
public void stop()
{
LOG.info("Shutting down ForkJoinPool [%s]", this);
pool.shutdown();
try {
if (!pool.awaitTermination(awaitShutdownMillis, TimeUnit.MILLISECONDS)) {
LOG.warn("Failed to complete all tasks in FJP [%s]", this);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted on shutdown", e);
}
}
public ForkJoinPool getPool()
{
return pool;
}
}

View File

@ -20,6 +20,7 @@
package org.apache.druid.query;
import org.apache.druid.java.util.common.concurrent.ExecutorServiceConfig;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.utils.JvmUtils;
@ -34,6 +35,7 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
public static final int DEFAULT_NUM_MERGE_BUFFERS = -1;
public static final int DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = -1;
public static final int MAX_DEFAULT_PROCESSING_BUFFER_SIZE_BYTES = 1024 * 1024 * 1024;
public static final int DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS = 60_000;
private AtomicReference<Integer> computedBufferSizeBytes = new AtomicReference<>();
@ -144,4 +146,59 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
{
return System.getProperty("java.io.tmpdir");
}
@Config(value = "${base_path}.merge.useParallelMergePool")
public boolean useParallelMergePool()
{
return true;
}
@Config(value = "${base_path}.merge.pool.parallelism")
public int getNumThreadsMergePoolConfigured()
{
return DEFAULT_NUM_THREADS;
}
public int getMergePoolParallelism()
{
int numThreadsConfigured = getNumThreadsMergePoolConfigured();
if (numThreadsConfigured != DEFAULT_NUM_THREADS) {
return numThreadsConfigured;
} else {
// assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5
return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);
}
}
@Config(value = "${base_path}.merge.pool.awaitShutdownMillis")
public long getMergePoolAwaitShutdownMillis()
{
return DEFAULT_MERGE_POOL_AWAIT_SHUTDOWN_MILLIS;
}
@Config(value = "${base_path}.merge.pool.defaultMaxQueryParallelism")
public int getMergePoolDefaultMaxQueryParallelism()
{
// assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores
return (int) Math.max(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.5, 1);
}
@Config(value = "${base_path}.merge.task.targetRunTimeMillis")
public int getMergePoolTargetTaskRunTimeMillis()
{
return ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS;
}
@Config(value = "${base_path}.merge.task.initialYieldNumRows")
public int getMergePoolTaskInitialYieldRows()
{
return ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS;
}
@Config(value = "${base_path}.merge.task.smallBatchNumRows")
public int getMergePoolSmallBatchRows()
{
return ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS;
}
}

View File

@ -39,6 +39,11 @@ public class QueryContexts
public static final String MAX_SCATTER_GATHER_BYTES_KEY = "maxScatterGatherBytes";
public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
public static final String BROKER_PARALLEL_MERGE_KEY = "enableParallelMerge";
public static final String BROKER_PARALLEL_MERGE_INITIAL_YIELD_ROWS_KEY = "parallelMergeInitialYieldRows";
public static final String BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY = "parallelMergeSmallBatchRows";
public static final String BROKER_PARALLELISM = "parallelMergeParallelism";
@Deprecated
public static final String CHUNK_PERIOD_KEY = "chunkPeriod";
public static final String VECTORIZE_KEY = "vectorize";
@ -54,6 +59,7 @@ public class QueryContexts
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
public static final long NO_TIMEOUT = 0;
public static final boolean DEFAULT_ENABLE_PARALLEL_MERGE = true;
@SuppressWarnings("unused") // Used by Jackson serialization
public enum Vectorize
@ -195,6 +201,26 @@ public class QueryContexts
return parseInt(query, PRIORITY_KEY, defaultValue);
}
public static <T> boolean getEnableParallelMerges(Query<T> query)
{
return parseBoolean(query, BROKER_PARALLEL_MERGE_KEY, DEFAULT_ENABLE_PARALLEL_MERGE);
}
public static <T> int getParallelMergeInitialYieldRows(Query<T> query, int defaultValue)
{
return parseInt(query, BROKER_PARALLEL_MERGE_INITIAL_YIELD_ROWS_KEY, defaultValue);
}
public static <T> int getParallelMergeSmallBatchRows(Query<T> query, int defaultValue)
{
return parseInt(query, BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY, defaultValue);
}
public static <T> int getParallelMergeParallelism(Query<T> query, int defaultValue)
{
return parseInt(query, BROKER_PARALLELISM, defaultValue);
}
@Deprecated
public static <T> String getChunkPeriod(Query<T> query)
{

View File

@ -111,11 +111,17 @@ public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultT
/**
* Creates a merge function that is used to merge intermediate aggregates from historicals in broker. This merge
* function is used in the default {@link ResultMergeQueryRunner} provided by
* {@link QueryToolChest#mergeResults(QueryRunner)} and can be used in additional future merge implementations
* {@link QueryToolChest#mergeResults(QueryRunner)} and also used in
* {@link org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence} by 'CachingClusteredClient' if it
* does not return null.
*
* Returning null from this function means that a query does not support result merging, at
* least via the mechanisms that utilize this function.
*/
@Nullable
public BinaryOperator<ResultType> createMergeFn(Query<ResultType> query)
{
throw new UOE("%s doesn't provide a merge function", query.getClass().getName());
return null;
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.druid.query;
import com.google.common.base.Preconditions;
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.guava.Sequence;
@ -43,6 +44,8 @@ public class ResultMergeQueryRunner<T> extends BySegmentSkippingQueryRunner<T>
)
{
super(baseRunner);
Preconditions.checkNotNull(comparatorGenerator);
Preconditions.checkNotNull(mergeFnGenerator);
this.comparatorGenerator = comparatorGenerator;
this.mergeFnGenerator = mergeFnGenerator;
}

View File

@ -113,7 +113,7 @@ public interface GroupByStrategy
@Nullable
default BinaryOperator<ResultRow> createMergeFn(Query<ResultRow> query)
{
throw new UOE("%s doesn't provide a merge function", this.getClass().getName());
return null;
}
/**

View File

@ -62,6 +62,7 @@ import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -130,21 +131,34 @@ public class SegmentMetadataQueryQueryToolChest extends QueryToolChest<SegmentAn
private Ordering<SegmentAnalysis> makeOrdering(SegmentMetadataQuery query)
{
if (query.isMerge()) {
// Merge everything always
return Comparators.alwaysEqual();
}
return query.getResultOrdering(); // No two elements should be equal, so it should never merge
return (Ordering<SegmentAnalysis>) SegmentMetadataQueryQueryToolChest.this.createResultComparator(query);
}
private BinaryOperator<SegmentAnalysis> createMergeFn(final SegmentMetadataQuery inQ)
{
return (arg1, arg2) -> mergeAnalyses(arg1, arg2, inQ.isLenientAggregatorMerge());
return SegmentMetadataQueryQueryToolChest.this.createMergeFn(inQ);
}
};
}
@Override
public BinaryOperator<SegmentAnalysis> createMergeFn(Query<SegmentAnalysis> query)
{
return (arg1, arg2) -> mergeAnalyses(arg1, arg2, ((SegmentMetadataQuery) query).isLenientAggregatorMerge());
}
@Override
public Comparator<SegmentAnalysis> createResultComparator(Query<SegmentAnalysis> query)
{
SegmentMetadataQuery segmentMetadataQuery = (SegmentMetadataQuery) query;
if (segmentMetadataQuery.isMerge()) {
// Merge everything always
return Comparators.alwaysEqual();
}
return segmentMetadataQuery.getResultOrdering(); // No two elements should be equal, so it should never merge
}
@Override
public QueryMetrics<Query<?>> makeMetrics(SegmentMetadataQuery query)
{

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.DateTimes;
@ -43,7 +45,9 @@ import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.timeline.LogicalSegment;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
/**
@ -112,6 +116,27 @@ public class TimeBoundaryQueryQueryToolChest
};
}
@Override
public BinaryOperator<Result<TimeBoundaryResultValue>> createMergeFn(Query<Result<TimeBoundaryResultValue>> query)
{
TimeBoundaryQuery boundQuery = (TimeBoundaryQuery) query;
return (result1, result2) -> {
final List<Result<TimeBoundaryResultValue>> mergeList;
if (result1 == null) {
mergeList = result2 != null ? ImmutableList.of(result2) : null;
} else {
mergeList = result2 != null ? ImmutableList.of(result1, result2) : ImmutableList.of(result1);
}
return Iterables.getOnlyElement(boundQuery.mergeResults(mergeList));
};
}
@Override
public Comparator<Result<TimeBoundaryResultValue>> createResultComparator(Query<Result<TimeBoundaryResultValue>> query)
{
return query.getResultOrdering();
}
@Override
public QueryMetrics<Query<?>> makeMetrics(TimeBoundaryQuery query)
{

View File

@ -27,6 +27,7 @@ import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;
/**
*/
@ -80,11 +81,7 @@ public class TimeBoundaryResultValue
TimeBoundaryResultValue that = (TimeBoundaryResultValue) o;
if (value != null ? !value.equals(that.value) : that.value != null) {
return false;
}
return true;
return Objects.equals(value, that.value);
}
@Override

View File

@ -37,6 +37,7 @@ import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.guice.annotations.Client;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.guice.http.DruidHttpClientConfig;
import org.apache.druid.java.util.common.Intervals;
@ -45,11 +46,13 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
@ -87,6 +90,8 @@ import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ForkJoinPool;
import java.util.function.BinaryOperator;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
@ -102,6 +107,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
private final CachePopulator cachePopulator;
private final CacheConfig cacheConfig;
private final DruidHttpClientConfig httpClientConfig;
private final DruidProcessingConfig processingConfig;
private final ForkJoinPool pool;
@Inject
public CachingClusteredClient(
@ -111,7 +118,9 @@ public class CachingClusteredClient implements QuerySegmentWalker
@Smile ObjectMapper objectMapper,
CachePopulator cachePopulator,
CacheConfig cacheConfig,
@Client DruidHttpClientConfig httpClientConfig
@Client DruidHttpClientConfig httpClientConfig,
DruidProcessingConfig processingConfig,
@Merging ForkJoinPool pool
)
{
this.warehouse = warehouse;
@ -121,6 +130,8 @@ public class CachingClusteredClient implements QuerySegmentWalker
this.cachePopulator = cachePopulator;
this.cacheConfig = cacheConfig;
this.httpClientConfig = httpClientConfig;
this.processingConfig = processingConfig;
this.pool = pool;
if (cacheConfig.isQueryCacheable(Query.GROUP_BY) && (cacheConfig.isUseCache() || cacheConfig.isPopulateCache())) {
log.warn(
@ -286,10 +297,32 @@ public class CachingClusteredClient implements QuerySegmentWalker
List<Sequence<T>> sequencesByInterval = new ArrayList<>(alreadyCachedResults.size() + segmentsByServer.size());
addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
addSequencesFromServer(sequencesByInterval, segmentsByServer);
return merge(sequencesByInterval);
});
}
private Sequence<T> merge(List<Sequence<T>> sequencesByInterval)
{
BinaryOperator<T> mergeFn = toolChest.createMergeFn(query);
if (processingConfig.useParallelMergePool() && QueryContexts.getEnableParallelMerges(query) && mergeFn != null) {
return new ParallelMergeCombiningSequence<>(
pool,
sequencesByInterval,
query.getResultOrdering(),
mergeFn,
QueryContexts.hasTimeout(query),
QueryContexts.getTimeout(query),
QueryContexts.getPriority(query),
QueryContexts.getParallelMergeParallelism(query, processingConfig.getMergePoolDefaultMaxQueryParallelism()),
QueryContexts.getParallelMergeInitialYieldRows(query, processingConfig.getMergePoolTaskInitialYieldRows()),
QueryContexts.getParallelMergeSmallBatchRows(query, processingConfig.getMergePoolSmallBatchRows()),
processingConfig.getMergePoolTargetTaskRunTimeMillis()
);
} else {
return Sequences
.simple(sequencesByInterval)
.flatMerge(seq -> seq, query.getResultOrdering());
});
}
}
private Set<ServerToSegment> computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline)

View File

@ -53,6 +53,7 @@ import org.apache.druid.utils.JvmUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
/**
*/
@ -135,6 +136,26 @@ public class DruidProcessingModule implements Module
);
}
@Provides
@ManageLifecycle
public LifecycleForkJoinPoolProvider getMergeProcessingPoolProvider(DruidProcessingConfig config)
{
return new LifecycleForkJoinPoolProvider(
config.getMergePoolParallelism(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> log.error(e, "Unhandled exception in thread [%s]", t),
true,
config.getMergePoolAwaitShutdownMillis()
);
}
@Provides
@Merging
public ForkJoinPool getMergeProcessingPool(LifecycleForkJoinPoolProvider poolProvider)
{
return poolProvider.getPool();
}
private void verifyDirectMemory(DruidProcessingConfig config)
{
try {

View File

@ -39,6 +39,7 @@ import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryPlus;
@ -65,6 +66,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
/**
*/
@ -302,13 +304,23 @@ public class CachingClusteredClientFunctionalityTest
return mergeLimit;
}
},
new DruidHttpClientConfig() {
new DruidHttpClientConfig()
{
@Override
public long getMaxQueuedBytes()
{
return 0L;
}
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
},
ForkJoinPool.commonPool()
);
}

View File

@ -69,6 +69,7 @@ import org.apache.druid.java.util.common.guava.nary.TrinaryFn;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.BySegmentResultValueClass;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
@ -153,6 +154,7 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
/**
*
@ -1380,16 +1382,13 @@ public class CachingClusteredClientTest
.bound(TimeBoundaryQuery.MAX_TIME)
.build(),
Intervals.of("2011-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("2011-01-01"), null, DateTimes.of("2011-01-02")),
makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-02")),
Intervals.of("2011-01-01/2011-01-03"),
makeTimeBoundaryResult(DateTimes.of("2011-01-02"), null, DateTimes.of("2011-01-03")),
makeTimeBoundaryResult(DateTimes.of("2011-01-03"), null, DateTimes.of("2011-01-03")),
Intervals.of("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(DateTimes.of("2011-01-05"), null, DateTimes.of("2011-01-10")),
Intervals.of("2011-01-01/2011-01-10"),
makeTimeBoundaryResult(DateTimes.of("2011-01-05T01"), null, DateTimes.of("2011-01-10"))
makeTimeBoundaryResult(DateTimes.of("2011-01-10"), null, DateTimes.of("2011-01-10"))
);
testQueryCaching(
@ -1594,19 +1593,19 @@ public class CachingClusteredClientTest
if (minTime != null && maxTime != null) {
value = ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString(),
minTime,
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
maxTime
);
} else if (maxTime != null) {
value = ImmutableMap.of(
TimeBoundaryQuery.MAX_TIME,
maxTime.toString()
maxTime
);
} else {
value = ImmutableMap.of(
TimeBoundaryQuery.MIN_TIME,
minTime.toString()
minTime
);
}
@ -2493,7 +2492,16 @@ public class CachingClusteredClientTest
{
return 0L;
}
}
},
new DruidProcessingConfig()
{
@Override
public String getFormatString()
{
return null;
}
},
ForkJoinPool.commonPool()
);
}
@ -2766,16 +2774,13 @@ public class CachingClusteredClientTest
.bound(TimeBoundaryQuery.MAX_TIME)
.build(),
Intervals.of("1970-01-01/2011-01-02"),
makeTimeBoundaryResult(DateTimes.of("1970-01-01"), null, DateTimes.of("1970-01-02")),
makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-02")),
Intervals.of("1970-01-01/2011-01-03"),
makeTimeBoundaryResult(DateTimes.of("1970-01-02"), null, DateTimes.of("1970-01-03")),
makeTimeBoundaryResult(DateTimes.of("1970-01-03"), null, DateTimes.of("1970-01-03")),
Intervals.of("1970-01-01/2011-01-10"),
makeTimeBoundaryResult(DateTimes.of("1970-01-05"), null, DateTimes.of("1970-01-10")),
Intervals.of("1970-01-01/2011-01-10"),
makeTimeBoundaryResult(DateTimes.of("1970-01-05T01"), null, DateTimes.of("1970-01-10"))
makeTimeBoundaryResult(DateTimes.of("1970-01-10"), null, DateTimes.of("1970-01-10"))
);
testQueryCaching(

View File

@ -169,6 +169,7 @@ aggregator
aggregators
ambari
analytics
async
authorizer
authorizers
autocomplete
@ -1285,10 +1286,14 @@ druid.broker.cache.useCache
druid.broker.cache.useResultLevelCache
druid.historical.cache.populateCache
druid.historical.cache.useCache
enableParallelMerge
floatSum
maxQueuedBytes
maxScatterGatherBytes
minTopNThreshold
parallelMergeInitialYieldRows
parallelMergeParallelism
parallelMergeSmallBatchRows
populateCache
populateResultLevelCache
queryId