diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java index 7bb80aae898..da790424068 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java +++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java @@ -76,6 +76,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase private final BinaryOperator combineFn; private final int queueSize; private final boolean hasTimeout; + private final long startTimeNanos; private final long timeoutAtNanos; private final int yieldAfter; private final int batchSize; @@ -105,12 +106,13 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase this.orderingFn = orderingFn; this.combineFn = combineFn; this.hasTimeout = hasTimeout; - this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS); + this.startTimeNanos = System.nanoTime(); + this.timeoutAtNanos = startTimeNanos + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS); this.parallelism = parallelism; this.yieldAfter = yieldAfter; this.batchSize = batchSize; this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS); - this.queueSize = 4 * (yieldAfter / batchSize); + this.queueSize = (1 << 15) / batchSize; // each queue can by default hold ~32k rows this.metricsReporter = reporter; this.cancellationGizmo = new CancellationGizmo(); } @@ -121,8 +123,9 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase if (inputSequences.isEmpty()) { return Sequences.empty().toYielder(initValue, accumulator); } - - final BlockingQueue> outputQueue = new ArrayBlockingQueue<>(queueSize); + // we make final output queue larger than the merging queues so if downstream readers are slower to read there is + // less chance of blocking the merge + final BlockingQueue> outputQueue = new ArrayBlockingQueue<>(4 * queueSize); final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size()); MergeCombinePartitioningAction mergeCombineAction = new MergeCombinePartitioningAction<>( inputSequences, @@ -147,6 +150,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase cancellationGizmo ).withBaggage(() -> { if (metricsReporter != null) { + metricsAccumulator.setTotalWallTime(System.nanoTime() - startTimeNanos); metricsReporter.accept(metricsAccumulator.build()); } }); @@ -698,6 +702,8 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase private final MergeCombineActionMetricsAccumulator metricsAccumulator; private final CancellationGizmo cancellationGizmo; + private final long startTime; + private PrepareMergeCombineInputsAction( List> partition, QueuePusher> outputQueue, @@ -719,6 +725,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase this.targetTimeNanos = targetTimeNanos; this.metricsAccumulator = metricsAccumulator; this.cancellationGizmo = cancellationGizmo; + this.startTime = System.nanoTime(); } @SuppressWarnings("unchecked") @@ -736,7 +743,6 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase cursor.close(); } } - if (cursors.size() > 0) { getPool().execute(new MergeCombineAction( cursors, @@ -753,6 +759,7 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase } else { outputQueue.offer(ResultBatch.TERMINAL); } + metricsAccumulator.setPartitionInitializedTime(System.nanoTime() - startTime); } catch (Throwable t) { closeAllCursors(partition); @@ -1195,6 +1202,9 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase private final long outputRows; private final long taskCount; private final long totalCpuTime; + private final long totalWallTime; + private final long fastestPartitionInitializedTime; + private final long slowestPartitionInitializedTime; MergeCombineMetrics( int parallelism, @@ -1202,7 +1212,10 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase long inputRows, long outputRows, long taskCount, - long totalCpuTime + long totalCpuTime, + long totalWallTime, + long fastestPartitionInitializedTime, + long slowestPartitionInitializedTime ) { this.parallelism = parallelism; @@ -1211,6 +1224,9 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase this.outputRows = outputRows; this.taskCount = taskCount; this.totalCpuTime = totalCpuTime; + this.totalWallTime = totalWallTime; + this.fastestPartitionInitializedTime = fastestPartitionInitializedTime; + this.slowestPartitionInitializedTime = slowestPartitionInitializedTime; } /** @@ -1263,6 +1279,21 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase { return totalCpuTime; } + + public long getTotalTime() + { + return totalWallTime; + } + + public long getFastestPartitionInitializedTime() + { + return fastestPartitionInitializedTime; + } + + public long getSlowestPartitionInitializedTime() + { + return slowestPartitionInitializedTime; + } } /** @@ -1274,6 +1305,9 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase { List partitionMetrics; MergeCombineActionMetricsAccumulator mergeMetrics; + + private long totalWallTime; + private final int inputSequences; MergeCombineMetricsAccumulator(int inputSequences) @@ -1291,6 +1325,11 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase this.partitionMetrics = partitionMetrics; } + void setTotalWallTime(long time) + { + this.totalWallTime = time; + } + MergeCombineMetrics build() { long numInputRows = 0; @@ -1299,11 +1338,20 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase // partition long totalPoolTasks = 1 + 1 + partitionMetrics.size(); + long fastestPartInitialized = partitionMetrics.size() > 0 ? Long.MAX_VALUE : mergeMetrics.getPartitionInitializedtime(); + long slowestPartInitialied = partitionMetrics.size() > 0 ? Long.MIN_VALUE : mergeMetrics.getPartitionInitializedtime(); + // accumulate input row count, cpu time, and total number of tasks from each partition for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) { numInputRows += partition.getInputRows(); cpuTimeNanos += partition.getTotalCpuTimeNanos(); totalPoolTasks += partition.getTaskCount(); + if (partition.getPartitionInitializedtime() < fastestPartInitialized) { + fastestPartInitialized = partition.getPartitionInitializedtime(); + } + if (partition.getPartitionInitializedtime() > slowestPartInitialied) { + slowestPartInitialied = partition.getPartitionInitializedtime(); + } } // if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the // value as it is only the number of intermediary input rows to the layer 2 task @@ -1322,7 +1370,10 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase numInputRows, numOutputRows, totalPoolTasks, - cpuTimeNanos + cpuTimeNanos, + totalWallTime, + fastestPartInitialized, + slowestPartInitialied ); } } @@ -1337,6 +1388,8 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase private long outputRows = 0; private long totalCpuTimeNanos = 0; + private long partitionInitializedtime = 0L; + void incrementTaskCount() { taskCount++; @@ -1357,6 +1410,11 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase totalCpuTimeNanos += nanos; } + void setPartitionInitializedTime(long nanos) + { + partitionInitializedtime = nanos; + } + long getTaskCount() { return taskCount; @@ -1376,6 +1434,11 @@ public class ParallelMergeCombiningSequence extends YieldingSequenceBase { return totalCpuTimeNanos; } + + long getPartitionInitializedtime() + { + return partitionInitializedtime; + } } private static void closeAllCursors(final Collection> cursors) diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index 441b36d5bb5..6cab0252e7f 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -352,6 +352,27 @@ public class DefaultQueryMetrics> implements QueryMet return this; } + @Override + public QueryMetrics reportParallelMergeTotalTime(long timeNs) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportParallelMergeFastestPartitionTime(long timeNs) + { + // Don't emit by default. + return this; + } + + @Override + public QueryMetrics reportParallelMergeSlowestPartitionTime(long timeNs) + { + // Don't emit by default. + return this; + } + @Override public QueryMetrics reportQueriedSegmentCount(long segmentCount) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index f4d71060bf7..ddbcfc0ed53 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -396,6 +396,30 @@ public interface QueryMetrics> */ QueryMetrics reportParallelMergeTotalCpuTime(long timeNs); + /** + * Reports broker total "wall" time in nanoseconds from parallel merge start sequence creation to total + * consumption. + */ + QueryMetrics reportParallelMergeTotalTime(long timeNs); + + /** + * Reports broker "wall" time in nanoseconds for the fastest parallel merge sequence partition to be 'initialized', + * where 'initialized' is time to the first result batch is populated from data servers and merging can begin. + * + * Similar to query 'time to first byte' metrics, except is a composite of the whole group of data servers which are + * present in the merge partition, which all must supply an initial result batch before merging can actually begin. + */ + QueryMetrics reportParallelMergeFastestPartitionTime(long timeNs); + + /** + * Reports broker "wall" time in nanoseconds for the slowest parallel merge sequence partition to be 'initialized', + * where 'initialized' is time to the first result batch is populated from data servers and merging can begin. + * + * Similar to query 'time to first byte' metrics, except is a composite of the whole group of data servers which are + * present in the merge partition, which all must supply an initial result batch before merging can actually begin. + */ + QueryMetrics reportParallelMergeSlowestPartitionTime(long timeNs); + /** * Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object. */ diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index 2481eeb35f2..eeb9b976a99 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -303,6 +303,24 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs); } + @Override + public QueryMetrics reportParallelMergeTotalTime(long timeNs) + { + return delegateQueryMetrics.reportParallelMergeTotalTime(timeNs); + } + + @Override + public QueryMetrics reportParallelMergeFastestPartitionTime(long timeNs) + { + return delegateQueryMetrics.reportParallelMergeFastestPartitionTime(timeNs); + } + + @Override + public QueryMetrics reportParallelMergeSlowestPartitionTime(long timeNs) + { + return delegateQueryMetrics.reportParallelMergeSlowestPartitionTime(timeNs); + } + @Override public QueryMetrics reportQueriedSegmentCount(long segmentCount) { diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index de5fdc5db19..52b36a0276e 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -403,12 +403,17 @@ public class CachingClusteredClient implements QuerySegmentWalker QueryMetrics queryMetrics = queryPlus.getQueryMetrics(); if (queryMetrics != null) { queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism()); - queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()); - queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()); - queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()); - queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()); - queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()); - queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()); + queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()).emit(emitter); + queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()).emit(emitter); + queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()).emit(emitter); + queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()).emit(emitter); + queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()).emit(emitter); + queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()).emit(emitter); + queryMetrics.reportParallelMergeTotalTime(reportMetrics.getTotalTime()).emit(emitter); + queryMetrics.reportParallelMergeSlowestPartitionTime(reportMetrics.getSlowestPartitionInitializedTime()) + .emit(emitter); + queryMetrics.reportParallelMergeFastestPartitionTime(reportMetrics.getFastestPartitionInitializedTime()) + .emit(emitter); } } ); @@ -884,7 +889,6 @@ public class CachingClusteredClient implements QuerySegmentWalker return null; } return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk); - } } }