add query metrics for broker parallel merges, off by default (#8981)

* add a bunch of metrics for broker parallel merges, off by default, and tests

* fix tests

* review stuffs

* propogateIfPossible
This commit is contained in:
Clint Wylie 2019-12-06 13:42:53 -08:00 committed by GitHub
parent cefcfe26dc
commit 06cd30460e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 566 additions and 78 deletions

View File

@ -135,7 +135,8 @@ public class BaseParallelMergeCombiningSequenceBenchmark
parallelism,
yieldAfter,
batchSize,
targetTaskTimeMillis
targetTaskTimeMillis,
null
);
}

View File

@ -29,6 +29,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
/**
* Artisanal, locally-sourced, hand-crafted, gluten and GMO free, bespoke, free-range, organic, small-batch parallel
@ -64,7 +66,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
public static final int DEFAULT_TASK_SMALL_BATCH_NUM_ROWS = 4096;
private final ForkJoinPool workerPool;
private final List<Sequence<T>> baseSequences;
private final List<Sequence<T>> inputSequences;
private final Ordering<T> orderingFn;
private final BinaryOperator<T> combineFn;
private final int queueSize;
@ -75,11 +77,12 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final int batchSize;
private final int parallelism;
private final long targetTimeNanos;
private final Consumer<MergeCombineMetrics> metricsReporter;
private final CancellationGizmo cancellationGizmo;
public ParallelMergeCombiningSequence(
ForkJoinPool workerPool,
List<Sequence<T>> baseSequences,
List<Sequence<T>> inputSequences,
Ordering<T> orderingFn,
BinaryOperator<T> combineFn,
boolean hasTimeout,
@ -88,11 +91,12 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
int parallelism,
int yieldAfter,
int batchSize,
int targetTimeMillis
int targetTimeMillis,
Consumer<MergeCombineMetrics> reporter
)
{
this.workerPool = workerPool;
this.baseSequences = baseSequences;
this.inputSequences = inputSequences;
this.orderingFn = orderingFn;
this.combineFn = combineFn;
this.hasTimeout = hasTimeout;
@ -103,19 +107,21 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.batchSize = batchSize;
this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS);
this.queueSize = 4 * (yieldAfter / batchSize);
this.metricsReporter = reporter;
this.cancellationGizmo = new CancellationGizmo();
}
@Override
public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator)
{
if (baseSequences.isEmpty()) {
if (inputSequences.isEmpty()) {
return Sequences.<T>empty().toYielder(initValue, accumulator);
}
final BlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<>(queueSize);
MergeCombinePartitioningAction<T> finalMergeAction = new MergeCombinePartitioningAction<>(
baseSequences,
final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size());
MergeCombinePartitioningAction<T> mergeCombineAction = new MergeCombinePartitioningAction<>(
inputSequences,
orderingFn,
combineFn,
outputQueue,
@ -126,10 +132,20 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
targetTimeNanos,
hasTimeout,
timeoutAtNanos,
metricsAccumulator,
cancellationGizmo
);
workerPool.execute(finalMergeAction);
Sequence<T> finalOutSequence = makeOutputSequenceForQueue(outputQueue, hasTimeout, timeoutAtNanos, cancellationGizmo);
workerPool.execute(mergeCombineAction);
Sequence<T> finalOutSequence = makeOutputSequenceForQueue(
outputQueue,
hasTimeout,
timeoutAtNanos,
cancellationGizmo
).withBaggage(() -> {
if (metricsReporter != null) {
metricsReporter.accept(metricsAccumulator.build());
}
});
return finalOutSequence.toYielder(initValue, accumulator);
}
@ -247,6 +263,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final long targetTimeNanos;
private final boolean hasTimeout;
private final long timeoutAt;
private final MergeCombineMetricsAccumulator metricsAccumulator;
private final CancellationGizmo cancellationGizmo;
private MergeCombinePartitioningAction(
@ -261,6 +278,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
long targetTimeNanos,
boolean hasTimeout,
long timeoutAt,
MergeCombineMetricsAccumulator metricsAccumulator,
CancellationGizmo cancellationGizmo
)
{
@ -275,6 +293,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.targetTimeNanos = targetTimeNanos;
this.hasTimeout = hasTimeout;
this.timeoutAt = timeoutAt;
this.metricsAccumulator = metricsAccumulator;
this.cancellationGizmo = cancellationGizmo;
}
@ -300,6 +319,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
for (Sequence<T> s : sequences) {
sequenceCursors.add(new YielderBatchedResultsCursor<>(new SequenceBatcher<>(s, batchSize), orderingFn));
}
MergeCombineActionMetricsAccumulator soloAccumulator = new MergeCombineActionMetricsAccumulator();
metricsAccumulator.setPartitions(Collections.emptyList());
metricsAccumulator.setMergeMetrics(soloAccumulator);
PrepareMergeCombineInputsAction<T> blockForInputsAction = new PrepareMergeCombineInputsAction<>(
sequenceCursors,
resultsPusher,
@ -308,6 +330,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
yieldAfter,
batchSize,
targetTimeNanos,
soloAccumulator,
cancellationGizmo
);
getPool().execute(blockForInputsAction);
@ -325,7 +348,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private void spawnParallelTasks(int parallelMergeTasks)
{
List<RecursiveAction> tasks = new ArrayList<>();
List<RecursiveAction> tasks = new ArrayList<>(parallelMergeTasks);
List<MergeCombineActionMetricsAccumulator> taskMetrics = new ArrayList<>(parallelMergeTasks);
List<BlockingQueue<ResultBatch<T>>> intermediaryOutputs = new ArrayList<>(parallelMergeTasks);
List<? extends List<Sequence<T>>> partitions =
@ -340,6 +365,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
for (Sequence<T> s : partition) {
partitionCursors.add(new YielderBatchedResultsCursor<>(new SequenceBatcher<>(s, batchSize), orderingFn));
}
MergeCombineActionMetricsAccumulator partitionAccumulator = new MergeCombineActionMetricsAccumulator();
PrepareMergeCombineInputsAction<T> blockForInputsAction = new PrepareMergeCombineInputsAction<>(
partitionCursors,
pusher,
@ -348,11 +374,15 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
yieldAfter,
batchSize,
targetTimeNanos,
partitionAccumulator,
cancellationGizmo
);
tasks.add(blockForInputsAction);
taskMetrics.add(partitionAccumulator);
}
metricsAccumulator.setPartitions(taskMetrics);
for (RecursiveAction task : tasks) {
getPool().execute(task);
}
@ -364,6 +394,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
new BlockingQueueuBatchedResultsCursor<>(queue, orderingFn, hasTimeout, timeoutAt)
);
}
MergeCombineActionMetricsAccumulator finalMergeMetrics = new MergeCombineActionMetricsAccumulator();
metricsAccumulator.setMergeMetrics(finalMergeMetrics);
PrepareMergeCombineInputsAction<T> finalMergeAction = new PrepareMergeCombineInputsAction<>(
intermediaryOutputsCursors,
outputPusher,
@ -372,6 +405,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
yieldAfter,
batchSize,
targetTimeNanos,
finalMergeMetrics,
cancellationGizmo
);
@ -444,9 +478,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
* this task completes and executes a new task to continue where it left off. This value is initially set by the
* {@link MergeCombinePartitioningAction} to a default value, but after that this process is timed to try and compute
* an 'optimal' number of rows to yield to achieve a task runtime of ~10ms, on the assumption that the time to process
* n results will be approximately the same. {@link #recursionDepth} is used to track how many times a task has
* continued executing, and utilized to compute a cumulative moving average of task run time per amount yielded in
* order to 'smooth' out the continual adjustment.
* n results will be approximately the same. {@link MergeCombineActionMetricsAccumulator#taskCount} is used to track
* how many times a task has continued executing, and utilized to compute a cumulative moving average of task run time
* per amount yielded in order to 'smooth' out the continual adjustment.
*/
private static class MergeCombineAction<T> extends RecursiveAction
{
@ -458,7 +492,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final int yieldAfter;
private final int batchSize;
private final long targetTimeNanos;
private final int recursionDepth;
private final MergeCombineActionMetricsAccumulator metricsAccumulator;
private final CancellationGizmo cancellationGizmo;
private MergeCombineAction(
@ -470,7 +504,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
int yieldAfter,
int batchSize,
long targetTimeNanos,
int recursionDepth,
MergeCombineActionMetricsAccumulator metricsAccumulator,
CancellationGizmo cancellationGizmo
)
{
@ -482,7 +516,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.yieldAfter = yieldAfter;
this.batchSize = batchSize;
this.targetTimeNanos = targetTimeNanos;
this.recursionDepth = recursionDepth;
this.metricsAccumulator = metricsAccumulator;
this.cancellationGizmo = cancellationGizmo;
}
@ -498,7 +532,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
ResultBatch<T> outputBatch = new ResultBatch<>(batchSize);
T currentCombinedValue = initialValue;
while (counter++ < yieldAfter && !pQueue.isEmpty()) {
while (counter < yieldAfter && !pQueue.isEmpty()) {
BatchedResultsCursor<T> cursor = pQueue.poll();
// push the queue along
@ -512,6 +546,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
cursor.close();
}
counter++;
// if current value is null, combine null with next value
if (currentCombinedValue == null) {
currentCombinedValue = combineFn.apply(null, nextValueToAccumulate);
@ -530,6 +565,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
if (batchCounter >= batchSize) {
outputQueue.offer(outputBatch);
outputBatch = new ResultBatch<>(batchSize);
metricsAccumulator.incrementOutputRows(batchCounter);
batchCounter = 0;
}
@ -540,19 +576,25 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
}
}
final long elapsedCpuNanos = JvmUtils.safeGetThreadCpuTime() - startCpuNanos;
metricsAccumulator.incrementInputRows(counter);
metricsAccumulator.incrementCpuTimeNanos(elapsedCpuNanos);
metricsAccumulator.incrementTaskCount();
if (!pQueue.isEmpty() && !cancellationGizmo.isCancelled()) {
// if there is still work to be done, execute a new task with the current accumulated value to continue
// combining where we left off
if (!outputBatch.isDrained()) {
outputQueue.offer(outputBatch);
metricsAccumulator.incrementOutputRows(batchCounter);
}
// measure the time it took to process 'yieldAfter' elements in order to project a next 'yieldAfter' value
// which we want to target a 10ms task run time. smooth this value with a cumulative moving average in order
// to prevent normal jitter in processing time from skewing the next yield value too far in any direction
final long elapsedNanos = System.nanoTime() - start;
final long elapsedCpuNanos = JvmUtils.safeGetThreadCpuTime() - startCpuNanos;
final double nextYieldAfter = Math.max((double) targetTimeNanos * ((double) yieldAfter / elapsedCpuNanos), 1.0);
final long recursionDepth = metricsAccumulator.getTaskCount();
final double cumulativeMovingAverage =
(nextYieldAfter + (recursionDepth * yieldAfter)) / (recursionDepth + 1);
final int adjustedNextYieldAfter = (int) Math.ceil(cumulativeMovingAverage);
@ -575,21 +617,22 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
adjustedNextYieldAfter,
batchSize,
targetTimeNanos,
recursionDepth + 1,
metricsAccumulator,
cancellationGizmo
));
} else if (cancellationGizmo.isCancelled()) {
// if we got the cancellation signal, go ahead and write terminal value into output queue to help gracefully
// allow downstream stuff to stop
LOG.debug("cancelled after %s tasks", recursionDepth);
LOG.debug("cancelled after %s tasks", metricsAccumulator.getTaskCount());
outputQueue.offer(ResultBatch.TERMINAL);
} else {
// if priority queue is empty, push the final accumulated value into the output batch and push it out
outputBatch.add(currentCombinedValue);
metricsAccumulator.incrementOutputRows(batchCounter + 1L);
outputQueue.offer(outputBatch);
// ... and the terminal value to indicate the blocking queue holding the values is complete
outputQueue.offer(ResultBatch.TERMINAL);
LOG.debug("merge combine complete after %s tasks", recursionDepth);
LOG.debug("merge combine complete after %s tasks", metricsAccumulator.getTaskCount());
}
}
catch (Exception ex) {
@ -623,6 +666,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final int yieldAfter;
private final int batchSize;
private final long targetTimeNanos;
private final MergeCombineActionMetricsAccumulator metricsAccumulator;
private final CancellationGizmo cancellationGizmo;
private PrepareMergeCombineInputsAction(
@ -633,6 +677,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
int yieldAfter,
int batchSize,
long targetTimeNanos,
MergeCombineActionMetricsAccumulator metricsAccumulator,
CancellationGizmo cancellationGizmo
)
{
@ -643,6 +688,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.yieldAfter = yieldAfter;
this.batchSize = batchSize;
this.targetTimeNanos = targetTimeNanos;
this.metricsAccumulator = metricsAccumulator;
this.cancellationGizmo = cancellationGizmo;
}
@ -669,7 +715,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
yieldAfter,
batchSize,
targetTimeNanos,
1,
metricsAccumulator,
cancellationGizmo
));
} else {
@ -1110,4 +1156,198 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
return new RE(ex);
}
}
/**
* Metrics for the execution of a {@link ParallelMergeCombiningSequence} on the {@link ForkJoinPool}
*/
public static class MergeCombineMetrics
{
private final int parallelism;
private final int inputSequences;
private final long inputRows;
private final long outputRows;
private final long taskCount;
private final long totalCpuTime;
MergeCombineMetrics(
int parallelism,
int inputSequences,
long inputRows,
long outputRows,
long taskCount,
long totalCpuTime
)
{
this.parallelism = parallelism;
this.inputSequences = inputSequences;
this.inputRows = inputRows;
this.outputRows = outputRows;
this.taskCount = taskCount;
this.totalCpuTime = totalCpuTime;
}
/**
* Total number of layer 1 parallel tasks (+ 1 for total number of concurrent tasks for this query)
*/
public int getParallelism()
{
return parallelism;
}
/**
* Total number of input {@link Sequence} processed by {@link ParallelMergeCombiningSequence}
*/
public long getInputSequences()
{
return inputSequences;
}
/**
* Total number of input 'rows' processed by the {@link ParallelMergeCombiningSequence}
*/
public long getInputRows()
{
return inputRows;
}
/**
* Total number of output 'rows' produced by merging and combining the set of input {@link Sequence}s
*/
public long getOutputRows()
{
return outputRows;
}
/**
* Total number of {@link ForkJoinPool} tasks involved in executing the {@link ParallelMergeCombiningSequence},
* including {@link MergeCombinePartitioningAction}, {@link PrepareMergeCombineInputsAction}, and
* {@link MergeCombineAction}.
*/
public long getTaskCount()
{
return taskCount;
}
/**
* Total CPU time in nanoseconds during the 'hot loop' of doing actual merging and combining
* in {@link MergeCombineAction}
*/
public long getTotalCpuTime()
{
return totalCpuTime;
}
}
/**
* Holder to accumlate metrics for all work done {@link ParallelMergeCombiningSequence}, containing layer 1 task
* metrics in {@link #partitionMetrics} and final merge task metrics in {@link #mergeMetrics}, in order to compute
* {@link MergeCombineMetrics} after the {@link ParallelMergeCombiningSequence} is completely consumed.
*/
static class MergeCombineMetricsAccumulator
{
List<MergeCombineActionMetricsAccumulator> partitionMetrics;
MergeCombineActionMetricsAccumulator mergeMetrics;
private final int inputSequences;
MergeCombineMetricsAccumulator(int inputSequences)
{
this.inputSequences = inputSequences;
}
void setMergeMetrics(MergeCombineActionMetricsAccumulator mergeMetrics)
{
this.mergeMetrics = mergeMetrics;
}
void setPartitions(List<MergeCombineActionMetricsAccumulator> partitionMetrics)
{
this.partitionMetrics = partitionMetrics;
}
MergeCombineMetrics build()
{
long numInputRows = 0;
long cpuTimeNanos = 0;
// 1 partition task, 1 layer two prepare merge inputs task, 1 layer one prepare merge inputs task for each
// partition
long totalPoolTasks = 1 + 1 + partitionMetrics.size();
// accumulate input row count, cpu time, and total number of tasks from each partition
for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) {
numInputRows += partition.getInputRows();
cpuTimeNanos += partition.getTotalCpuTimeNanos();
totalPoolTasks += partition.getTaskCount();
}
// if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the
// value as it is only the number of intermediary input rows to the layer 2 task
if (partitionMetrics.isEmpty()) {
numInputRows = mergeMetrics.getInputRows();
}
// number of fjp tasks and cpu time is interesting though
totalPoolTasks += mergeMetrics.getTaskCount();
cpuTimeNanos += mergeMetrics.getTotalCpuTimeNanos();
final long numOutputRows = mergeMetrics.getOutputRows();
return new MergeCombineMetrics(
Math.max(partitionMetrics.size(), 1),
inputSequences,
numInputRows,
numOutputRows,
totalPoolTasks,
cpuTimeNanos
);
}
}
/**
* Accumulate metrics about a single chain of{@link MergeCombineAction}
*/
static class MergeCombineActionMetricsAccumulator
{
private long taskCount = 1;
private long inputRows = 0;
private long outputRows = 0;
private long totalCpuTimeNanos = 0;
void incrementTaskCount()
{
taskCount++;
}
void incrementInputRows(long numInputRows)
{
inputRows += numInputRows;
}
void incrementOutputRows(long numOutputRows)
{
outputRows += numOutputRows;
}
void incrementCpuTimeNanos(long nanos)
{
totalCpuTimeNanos += nanos;
}
long getTaskCount()
{
return taskCount;
}
long getInputRows()
{
return inputRows;
}
long getOutputRows()
{
return outputRows;
}
long getTotalCpuTimeNanos()
{
return totalCpuTimeNanos;
}
}
}

View File

@ -62,6 +62,7 @@ final class WrappingYielder<OutType> implements Yielder<OutType>
catch (Exception e) {
t.addSuppressed(e);
}
Throwables.propagateIfPossible(t);
throw new RuntimeException(t);
}
}
@ -89,6 +90,7 @@ final class WrappingYielder<OutType> implements Yielder<OutType>
t.addSuppressed(e);
}
Throwables.propagateIfInstanceOf(t, IOException.class);
Throwables.propagateIfPossible(t);
throw new RuntimeException(t);
}
// "Normal" close
@ -97,6 +99,7 @@ final class WrappingYielder<OutType> implements Yielder<OutType>
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
}

View File

@ -289,7 +289,7 @@ public interface ApplyFunction
/**
* Accumulate a value for a single array input with a 2 argument {@link LambdaExpr}. The 'array' input expression is
* the first argument, the initial value for the accumlator expression is the 2nd argument.
* the first argument, the initial value for the accumulator expression is the 2nd argument.
*/
class FoldFunction extends BaseFoldFunction
{
@ -314,10 +314,10 @@ public interface ApplyFunction
if (array == null) {
return ExprEval.of(null);
}
Object accumlator = accEval.value();
Object accumulator = accEval.value();
FoldLambdaBinding lambdaBinding = new FoldLambdaBinding(array, accumlator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumlator, lambdaBinding);
FoldLambdaBinding lambdaBinding = new FoldLambdaBinding(array, accumulator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumulator, lambdaBinding);
}
@Override
@ -340,8 +340,8 @@ public interface ApplyFunction
/**
* Accumulate a value for the cartesian product of 'n' array inputs arguments with an 'n + 1' argument
* {@link LambdaExpr}. The 'array' input expressions are the first 'n' arguments, the initial value for the accumlator
* expression is the final argument.
* {@link LambdaExpr}. The 'array' input expressions are the first 'n' arguments, the initial value for the
* accumulator expression is the final argument.
*/
class CartesianFoldFunction extends BaseFoldFunction
{
@ -385,11 +385,11 @@ public interface ApplyFunction
ExprEval accEval = accExpr.eval(bindings);
Object accumlator = accEval.value();
Object accumulator = accEval.value();
CartesianFoldLambdaBinding lambdaBindings =
new CartesianFoldLambdaBinding(product, accumlator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumlator, lambdaBindings);
new CartesianFoldLambdaBinding(product, accumulator, lambdaExpr, bindings);
return applyFold(lambdaExpr, accumulator, lambdaBindings);
}
@Override

View File

@ -41,9 +41,11 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
public class ParallelMergeCombiningSequenceTest
{
private static final int TEST_POOL_SIZE = 4;
private static final Logger LOG = new Logger(ParallelMergeCombiningSequenceTest.class);
public static final Ordering<IntPair> INT_PAIR_ORDERING = Ordering.natural().onResultOf(p -> p.lhs);
@ -65,7 +67,7 @@ public class ParallelMergeCombiningSequenceTest
public void setup()
{
pool = new ForkJoinPool(
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.75),
TEST_POOL_SIZE,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
(t, e) -> LOG.error(e, "Unhandled exception in thread [%s]", t),
true
@ -255,7 +257,7 @@ public class ParallelMergeCombiningSequenceTest
input.add(Sequences.empty());
assertResult(input);
// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
@ -273,7 +275,7 @@ public class ParallelMergeCombiningSequenceTest
input.clear();
// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(Sequences.empty());
input.add(Sequences.empty());
input.add(Sequences.empty());
@ -290,18 +292,32 @@ public class ParallelMergeCombiningSequenceTest
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
assertResult(input, 10, 20);
assertResult(input, 10, 20, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(11, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(6, reportMetrics.getOutputRows(), 5);
Assert.assertEquals(4, reportMetrics.getTaskCount());
});
input.clear();
// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(4));
input.add(nonBlockingSequence(6));
assertResult(input, 10, 20);
assertResult(input, 10, 20, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(34, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(16, reportMetrics.getOutputRows(), 15);
Assert.assertEquals(10, reportMetrics.getTaskCount(), 2);
});
}
@Test
@ -311,18 +327,32 @@ public class ParallelMergeCombiningSequenceTest
List<Sequence<IntPair>> input = new ArrayList<>();
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
assertResult(input, 4, 20);
assertResult(input, 4, 20, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(11, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(6, reportMetrics.getOutputRows(), 5);
Assert.assertEquals(4, reportMetrics.getTaskCount());
});
input.clear();
// above min sequence count threshold, so will merge in parallel (if enough cores)
// above min sequence count threshold, so will merge in parallel
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(6));
input.add(nonBlockingSequence(5));
input.add(nonBlockingSequence(8));
input.add(nonBlockingSequence(4));
input.add(nonBlockingSequence(6));
assertResult(input, 4, 20);
assertResult(input, 4, 20, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(34, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(16, reportMetrics.getOutputRows(), 15);
Assert.assertEquals(10, reportMetrics.getTaskCount(), 2);
});
}
@ -334,7 +364,14 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(15));
input.add(nonBlockingSequence(26));
assertResult(input, 5, 10);
assertResult(input, 5, 10, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(41, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(21, reportMetrics.getOutputRows(), 20);
Assert.assertEquals(4, reportMetrics.getTaskCount(), 2);
});
// above min sequence count threshold, so will merge in parallel (if enough cores)
input.add(nonBlockingSequence(15));
@ -342,7 +379,14 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(17));
input.add(nonBlockingSequence(14));
assertResult(input, 5, 10);
assertResult(input, 5, 10, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(120, reportMetrics.getInputRows());
// deltas because it depends how much result combining is happening, which is random
Assert.assertEquals(60, reportMetrics.getOutputRows(), 59);
Assert.assertEquals(10, reportMetrics.getTaskCount(), 5);
});
}
@Test
@ -372,14 +416,22 @@ public class ParallelMergeCombiningSequenceTest
input.add(nonBlockingSequence(10_000));
input.add(nonBlockingSequence(9_001));
assertResult(input, 128, 1024);
assertResult(input, 128, 1024, reportMetrics -> {
Assert.assertEquals(1, reportMetrics.getParallelism());
Assert.assertEquals(2, reportMetrics.getInputSequences());
Assert.assertEquals(19_001, reportMetrics.getInputRows());
});
input.add(nonBlockingSequence(7_777));
input.add(nonBlockingSequence(8_500));
input.add(nonBlockingSequence(5_000));
input.add(nonBlockingSequence(8_888));
assertResult(input, 128, 1024);
assertResult(input, 128, 1024, reportMetrics -> {
Assert.assertEquals(2, reportMetrics.getParallelism());
Assert.assertEquals(6, reportMetrics.getInputSequences());
Assert.assertEquals(49166, reportMetrics.getInputRows());
});
}
@Test
@ -482,12 +534,29 @@ public class ParallelMergeCombiningSequenceTest
assertResult(
sequences,
ParallelMergeCombiningSequence.DEFAULT_TASK_SMALL_BATCH_NUM_ROWS,
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS
ParallelMergeCombiningSequence.DEFAULT_TASK_INITIAL_YIELD_NUM_ROWS,
null
);
}
private void assertResult(List<Sequence<IntPair>> sequences, int batchSize, int yieldAfter)
throws InterruptedException, IOException
{
assertResult(
sequences,
batchSize,
yieldAfter,
null
);
}
private void assertResult(
List<Sequence<IntPair>> sequences,
int batchSize,
int yieldAfter,
Consumer<ParallelMergeCombiningSequence.MergeCombineMetrics> reporter
)
throws InterruptedException, IOException
{
final CombiningSequence<IntPair> combiningSequence = CombiningSequence.create(
new MergeSequence<>(INT_PAIR_ORDERING, Sequences.simple(sequences)),
@ -503,10 +572,11 @@ public class ParallelMergeCombiningSequenceTest
true,
5000,
0,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5),
TEST_POOL_SIZE,
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
reporter
);
Yielder<IntPair> combiningYielder = Yielders.each(combiningSequence);
@ -561,10 +631,11 @@ public class ParallelMergeCombiningSequenceTest
true,
timeout,
0,
(int) Math.ceil(Runtime.getRuntime().availableProcessors() * 0.5),
TEST_POOL_SIZE,
yieldAfter,
batchSize,
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS
ParallelMergeCombiningSequence.DEFAULT_TASK_TARGET_RUN_TIME_MILLIS,
null
);
Yielder<IntPair> parallelMergeCombineYielder = Yielders.each(parallelMergeCombineSequence);

View File

@ -186,6 +186,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
// Emit nothing by default.
}
@Override
public void parallelMergeParallelism(final int parallelism)
{
// Emit nothing by default.
}
@Override
public BitmapResultFactory<?> makeBitmapResultFactory(BitmapFactory factory)
{
@ -254,18 +260,6 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
return reportMillisTimeMetric("query/node/time", timeNs);
}
private QueryMetrics<QueryType> reportMillisTimeMetric(String metricName, long timeNs)
{
return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
}
protected QueryMetrics<QueryType> reportMetric(String metricName, Number value)
{
checkModifiedFromOwnerThread();
metrics.put(metricName, value);
return this;
}
@Override
public QueryMetrics<QueryType> reportNodeBytes(long byteCount)
{
@ -293,6 +287,48 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeParallelism(int parallelism)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeInputSequences(long numSequences)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeInputRows(long numRows)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeOutputRows(long numRows)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeTaskCount(long numTasks)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override
public void emit(ServiceEmitter emitter)
{
@ -302,4 +338,16 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
}
metrics.clear();
}
protected QueryMetrics<QueryType> reportMetric(String metricName, Number value)
{
checkModifiedFromOwnerThread();
metrics.put(metricName, value);
return this;
}
private QueryMetrics<QueryType> reportMillisTimeMetric(String metricName, long timeNs)
{
return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
}
}

View File

@ -148,22 +148,39 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig implem
}
@Config(value = "${base_path}.merge.useParallelMergePool")
public boolean useParallelMergePool()
public boolean useParallelMergePoolConfigured()
{
return true;
}
public boolean useParallelMergePool()
{
final boolean useParallelMergePoolConfigured = useParallelMergePoolConfigured();
final int parallelism = getMergePoolParallelism();
// need at least 3 to do 2 layer merge
if (parallelism > 2) {
return useParallelMergePoolConfigured;
}
if (useParallelMergePoolConfigured) {
log.debug(
"Parallel merge pool is enabled, but there are not enough cores to enable parallel merges: %s",
parallelism
);
}
return false;
}
@Config(value = "${base_path}.merge.pool.parallelism")
public int getNumThreadsMergePoolConfigured()
public int getMergePoolParallelismConfigured()
{
return DEFAULT_NUM_THREADS;
}
public int getMergePoolParallelism()
{
int numThreadsConfigured = getNumThreadsMergePoolConfigured();
if (numThreadsConfigured != DEFAULT_NUM_THREADS) {
return numThreadsConfigured;
int poolParallelismConfigured = getMergePoolParallelismConfigured();
if (poolParallelismConfigured != DEFAULT_NUM_THREADS) {
return poolParallelismConfigured;
} else {
// assume 2 hyper-threads per core, so that this value is probably by default the number of physical cores * 1.5
return (int) Math.ceil(JvmUtils.getRuntimeInfo().getAvailableProcessors() * 0.75);

View File

@ -30,8 +30,8 @@ import org.joda.time.Interval;
import java.util.List;
/**
* Abstraction wrapping {@link org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder} and allowing to control what
* metrics are actually emitted, what dimensions do they have, etc.
* Abstraction wrapping {@link org.apache.druid.java.util.emitter.service.ServiceMetricEvent.Builder} and allowing to
* control what metrics are actually emitted, what dimensions do they have, etc.
*
*
* Goals of QueryMetrics
@ -113,9 +113,9 @@ import java.util.List;
*
* Making subinterfaces of QueryMetrics for emitting custom dimensions and/or metrics for specific query types
* -----------------------------------------------------------------------------------------------------------
* If a query type (e. g. {@link org.apache.druid.query.metadata.metadata.SegmentMetadataQuery} (it's runners) needs to emit
* custom dimensions and/or metrics which doesn't make sense for all other query types, the following steps should be
* executed:
* If a query type (e. g. {@link org.apache.druid.query.metadata.metadata.SegmentMetadataQuery} (it's runners) needs to
* emit custom dimensions and/or metrics which doesn't make sense for all other query types, the following steps should
* be executed:
*
* 1. Create `interface SegmentMetadataQueryMetrics extends QueryMetrics` (here and below "SegmentMetadata" is the
* query type) with additional methods (see "Adding new methods" section above).
@ -148,11 +148,11 @@ import java.util.List;
* This complex procedure is needed to ensure custom {@link GenericQueryMetricsFactory} specified by users still works
* for the query type when query type decides to create their custom QueryMetrics subclass.
*
* {@link org.apache.druid.query.topn.TopNQueryMetrics}, {@link org.apache.druid.query.groupby.GroupByQueryMetrics}, and {@link
* org.apache.druid.query.timeseries.TimeseriesQueryMetrics} are implemented differently, because they are introduced at the
* same time as the whole QueryMetrics abstraction and their default implementations have to actually emit more
* dimensions than the default generic QueryMetrics. So those subinterfaces shouldn't be taken as direct examples for
* following the plan specified above.
* {@link org.apache.druid.query.topn.TopNQueryMetrics}, {@link org.apache.druid.query.groupby.GroupByQueryMetrics}, and
* {@link org.apache.druid.query.timeseries.TimeseriesQueryMetrics} are implemented differently, because they are
* introduced at the same time as the whole QueryMetrics abstraction and their default implementations have to actually
* emit more dimensions than the default generic QueryMetrics. So those subinterfaces shouldn't be taken as direct
* examples for following the plan specified above.
*
* Refer {@link SearchQueryMetricsFactory} as an implementation example of this procedure.
*
@ -241,6 +241,13 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/
void vectorized(boolean vectorized);
/**
* Sets broker merge parallelism, if parallel merges are enabled. This will only appear in broker level metrics. This
* value is identical to the {@link #reportParallelMergeParallelism} metric value, but optionally also available as a
* dimension.
*/
void parallelMergeParallelism(int parallelism);
/**
* Creates a {@link BitmapResultFactory} which may record some information along bitmap construction from {@link
* #preFilters(List)}. The returned BitmapResultFactory may add some dimensions to this QueryMetrics from it's {@link
@ -321,6 +328,38 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/
QueryMetrics<QueryType> reportPreFilteredRows(long numRows);
/**
* Reports number of parallel tasks the broker used to process the query during parallel merge. This value is
* identical to the {@link #parallelMergeParallelism} dimension value, but optionally also available as a metric.
*/
QueryMetrics<QueryType> reportParallelMergeParallelism(int parallelism);
/**
* Reports total number of input sequences processed by the broker during parallel merge.
*/
QueryMetrics<QueryType> reportParallelMergeInputSequences(long numSequences);
/**
* Reports total number of input rows processed by the broker during parallel merge.
*/
QueryMetrics<QueryType> reportParallelMergeInputRows(long numRows);
/**
* Reports broker total number of output rows after merging and combining input sequences (should be less than or
* equal to the value supplied to {@link #reportParallelMergeInputRows}.
*/
QueryMetrics<QueryType> reportParallelMergeOutputRows(long numRows);
/**
* Reports broker total number of fork join pool tasks required to complete query
*/
QueryMetrics<QueryType> reportParallelMergeTaskCount(long numTasks);
/**
* Reports broker total CPU time in nanoseconds where fork join merge combine tasks were doing work
*/
QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs);
/**
* Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object.
*/

View File

@ -166,6 +166,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
delegateQueryMetrics.vectorized(vectorized);
}
@Override
public void parallelMergeParallelism(int parallelism)
{
delegateQueryMetrics.parallelMergeParallelism(parallelism);
}
@Override
public BitmapResultFactory<?> makeBitmapResultFactory(BitmapFactory factory)
{
@ -256,6 +262,42 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
return delegateQueryMetrics.reportPreFilteredRows(numRows);
}
@Override
public QueryMetrics reportParallelMergeParallelism(int parallelism)
{
return delegateQueryMetrics.reportParallelMergeParallelism(parallelism);
}
@Override
public QueryMetrics reportParallelMergeInputSequences(long numSequences)
{
return delegateQueryMetrics.reportParallelMergeInputSequences(numSequences);
}
@Override
public QueryMetrics reportParallelMergeInputRows(long numRows)
{
return delegateQueryMetrics.reportParallelMergeInputRows(numRows);
}
@Override
public QueryMetrics reportParallelMergeOutputRows(long numRows)
{
return delegateQueryMetrics.reportParallelMergeOutputRows(numRows);
}
@Override
public QueryMetrics reportParallelMergeTaskCount(long numTasks)
{
return delegateQueryMetrics.reportParallelMergeTaskCount(numTasks);
}
@Override
public QueryMetrics reportParallelMergeTotalCpuTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs);
}
@Override
public void emit(ServiceEmitter emitter)
{

View File

@ -55,6 +55,7 @@ import org.apache.druid.query.CacheStrategy;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
@ -316,7 +317,19 @@ public class CachingClusteredClient implements QuerySegmentWalker
QueryContexts.getParallelMergeParallelism(query, processingConfig.getMergePoolDefaultMaxQueryParallelism()),
QueryContexts.getParallelMergeInitialYieldRows(query, processingConfig.getMergePoolTaskInitialYieldRows()),
QueryContexts.getParallelMergeSmallBatchRows(query, processingConfig.getMergePoolSmallBatchRows()),
processingConfig.getMergePoolTargetTaskRunTimeMillis()
processingConfig.getMergePoolTargetTaskRunTimeMillis(),
reportMetrics -> {
QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics();
if (queryMetrics != null) {
queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism());
queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism());
queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences());
queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows());
queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows());
queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount());
queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime());
}
}
);
} else {
return Sequences

View File

@ -319,6 +319,13 @@ public class CachingClusteredClientFunctionalityTest
{
return null;
}
@Override
public int getMergePoolParallelism()
{
// fixed so same behavior across all test environments
return 4;
}
},
ForkJoinPool.commonPool()
);

View File

@ -2500,6 +2500,13 @@ public class CachingClusteredClientTest
{
return null;
}
@Override
public int getMergePoolParallelism()
{
// fixed so same behavior across all test environments
return 4;
}
},
ForkJoinPool.commonPool()
);