fix bug with broker parallel merge metrics emitting, add wall time, fast/slow partition time metrics (#13420)

This commit is contained in:
Clint Wylie 2022-12-06 17:50:59 -08:00 committed by GitHub
parent 83261f9641
commit 37d8833125
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 14 deletions

View File

@ -76,6 +76,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final BinaryOperator<T> combineFn; private final BinaryOperator<T> combineFn;
private final int queueSize; private final int queueSize;
private final boolean hasTimeout; private final boolean hasTimeout;
private final long startTimeNanos;
private final long timeoutAtNanos; private final long timeoutAtNanos;
private final int yieldAfter; private final int yieldAfter;
private final int batchSize; private final int batchSize;
@ -105,12 +106,13 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.orderingFn = orderingFn; this.orderingFn = orderingFn;
this.combineFn = combineFn; this.combineFn = combineFn;
this.hasTimeout = hasTimeout; this.hasTimeout = hasTimeout;
this.timeoutAtNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS); this.startTimeNanos = System.nanoTime();
this.timeoutAtNanos = startTimeNanos + TimeUnit.NANOSECONDS.convert(timeoutMillis, TimeUnit.MILLISECONDS);
this.parallelism = parallelism; this.parallelism = parallelism;
this.yieldAfter = yieldAfter; this.yieldAfter = yieldAfter;
this.batchSize = batchSize; this.batchSize = batchSize;
this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS); this.targetTimeNanos = TimeUnit.NANOSECONDS.convert(targetTimeMillis, TimeUnit.MILLISECONDS);
this.queueSize = 4 * (yieldAfter / batchSize); this.queueSize = (1 << 15) / batchSize; // each queue can by default hold ~32k rows
this.metricsReporter = reporter; this.metricsReporter = reporter;
this.cancellationGizmo = new CancellationGizmo(); this.cancellationGizmo = new CancellationGizmo();
} }
@ -121,8 +123,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
if (inputSequences.isEmpty()) { if (inputSequences.isEmpty()) {
return Sequences.<T>empty().toYielder(initValue, accumulator); return Sequences.<T>empty().toYielder(initValue, accumulator);
} }
// we make final output queue larger than the merging queues so if downstream readers are slower to read there is
final BlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<>(queueSize); // less chance of blocking the merge
final BlockingQueue<ResultBatch<T>> outputQueue = new ArrayBlockingQueue<>(4 * queueSize);
final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size()); final MergeCombineMetricsAccumulator metricsAccumulator = new MergeCombineMetricsAccumulator(inputSequences.size());
MergeCombinePartitioningAction<T> mergeCombineAction = new MergeCombinePartitioningAction<>( MergeCombinePartitioningAction<T> mergeCombineAction = new MergeCombinePartitioningAction<>(
inputSequences, inputSequences,
@ -147,6 +150,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
cancellationGizmo cancellationGizmo
).withBaggage(() -> { ).withBaggage(() -> {
if (metricsReporter != null) { if (metricsReporter != null) {
metricsAccumulator.setTotalWallTime(System.nanoTime() - startTimeNanos);
metricsReporter.accept(metricsAccumulator.build()); metricsReporter.accept(metricsAccumulator.build());
} }
}); });
@ -698,6 +702,8 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final MergeCombineActionMetricsAccumulator metricsAccumulator; private final MergeCombineActionMetricsAccumulator metricsAccumulator;
private final CancellationGizmo cancellationGizmo; private final CancellationGizmo cancellationGizmo;
private final long startTime;
private PrepareMergeCombineInputsAction( private PrepareMergeCombineInputsAction(
List<BatchedResultsCursor<T>> partition, List<BatchedResultsCursor<T>> partition,
QueuePusher<ResultBatch<T>> outputQueue, QueuePusher<ResultBatch<T>> outputQueue,
@ -719,6 +725,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.targetTimeNanos = targetTimeNanos; this.targetTimeNanos = targetTimeNanos;
this.metricsAccumulator = metricsAccumulator; this.metricsAccumulator = metricsAccumulator;
this.cancellationGizmo = cancellationGizmo; this.cancellationGizmo = cancellationGizmo;
this.startTime = System.nanoTime();
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -736,7 +743,6 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
cursor.close(); cursor.close();
} }
} }
if (cursors.size() > 0) { if (cursors.size() > 0) {
getPool().execute(new MergeCombineAction<T>( getPool().execute(new MergeCombineAction<T>(
cursors, cursors,
@ -753,6 +759,7 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
} else { } else {
outputQueue.offer(ResultBatch.TERMINAL); outputQueue.offer(ResultBatch.TERMINAL);
} }
metricsAccumulator.setPartitionInitializedTime(System.nanoTime() - startTime);
} }
catch (Throwable t) { catch (Throwable t) {
closeAllCursors(partition); closeAllCursors(partition);
@ -1195,6 +1202,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private final long outputRows; private final long outputRows;
private final long taskCount; private final long taskCount;
private final long totalCpuTime; private final long totalCpuTime;
private final long totalWallTime;
private final long fastestPartitionInitializedTime;
private final long slowestPartitionInitializedTime;
MergeCombineMetrics( MergeCombineMetrics(
int parallelism, int parallelism,
@ -1202,7 +1212,10 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
long inputRows, long inputRows,
long outputRows, long outputRows,
long taskCount, long taskCount,
long totalCpuTime long totalCpuTime,
long totalWallTime,
long fastestPartitionInitializedTime,
long slowestPartitionInitializedTime
) )
{ {
this.parallelism = parallelism; this.parallelism = parallelism;
@ -1211,6 +1224,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.outputRows = outputRows; this.outputRows = outputRows;
this.taskCount = taskCount; this.taskCount = taskCount;
this.totalCpuTime = totalCpuTime; this.totalCpuTime = totalCpuTime;
this.totalWallTime = totalWallTime;
this.fastestPartitionInitializedTime = fastestPartitionInitializedTime;
this.slowestPartitionInitializedTime = slowestPartitionInitializedTime;
} }
/** /**
@ -1263,6 +1279,21 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
{ {
return totalCpuTime; return totalCpuTime;
} }
public long getTotalTime()
{
return totalWallTime;
}
public long getFastestPartitionInitializedTime()
{
return fastestPartitionInitializedTime;
}
public long getSlowestPartitionInitializedTime()
{
return slowestPartitionInitializedTime;
}
} }
/** /**
@ -1274,6 +1305,9 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
{ {
List<MergeCombineActionMetricsAccumulator> partitionMetrics; List<MergeCombineActionMetricsAccumulator> partitionMetrics;
MergeCombineActionMetricsAccumulator mergeMetrics; MergeCombineActionMetricsAccumulator mergeMetrics;
private long totalWallTime;
private final int inputSequences; private final int inputSequences;
MergeCombineMetricsAccumulator(int inputSequences) MergeCombineMetricsAccumulator(int inputSequences)
@ -1291,6 +1325,11 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
this.partitionMetrics = partitionMetrics; this.partitionMetrics = partitionMetrics;
} }
void setTotalWallTime(long time)
{
this.totalWallTime = time;
}
MergeCombineMetrics build() MergeCombineMetrics build()
{ {
long numInputRows = 0; long numInputRows = 0;
@ -1299,11 +1338,20 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
// partition // partition
long totalPoolTasks = 1 + 1 + partitionMetrics.size(); long totalPoolTasks = 1 + 1 + partitionMetrics.size();
long fastestPartInitialized = partitionMetrics.size() > 0 ? Long.MAX_VALUE : mergeMetrics.getPartitionInitializedtime();
long slowestPartInitialied = partitionMetrics.size() > 0 ? Long.MIN_VALUE : mergeMetrics.getPartitionInitializedtime();
// accumulate input row count, cpu time, and total number of tasks from each partition // accumulate input row count, cpu time, and total number of tasks from each partition
for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) { for (MergeCombineActionMetricsAccumulator partition : partitionMetrics) {
numInputRows += partition.getInputRows(); numInputRows += partition.getInputRows();
cpuTimeNanos += partition.getTotalCpuTimeNanos(); cpuTimeNanos += partition.getTotalCpuTimeNanos();
totalPoolTasks += partition.getTaskCount(); totalPoolTasks += partition.getTaskCount();
if (partition.getPartitionInitializedtime() < fastestPartInitialized) {
fastestPartInitialized = partition.getPartitionInitializedtime();
}
if (partition.getPartitionInitializedtime() > slowestPartInitialied) {
slowestPartInitialied = partition.getPartitionInitializedtime();
}
} }
// if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the // if serial merge done, only mergeMetrics is populated, get input rows from there instead. otherwise, ignore the
// value as it is only the number of intermediary input rows to the layer 2 task // value as it is only the number of intermediary input rows to the layer 2 task
@ -1322,7 +1370,10 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
numInputRows, numInputRows,
numOutputRows, numOutputRows,
totalPoolTasks, totalPoolTasks,
cpuTimeNanos cpuTimeNanos,
totalWallTime,
fastestPartInitialized,
slowestPartInitialied
); );
} }
} }
@ -1337,6 +1388,8 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
private long outputRows = 0; private long outputRows = 0;
private long totalCpuTimeNanos = 0; private long totalCpuTimeNanos = 0;
private long partitionInitializedtime = 0L;
void incrementTaskCount() void incrementTaskCount()
{ {
taskCount++; taskCount++;
@ -1357,6 +1410,11 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
totalCpuTimeNanos += nanos; totalCpuTimeNanos += nanos;
} }
void setPartitionInitializedTime(long nanos)
{
partitionInitializedtime = nanos;
}
long getTaskCount() long getTaskCount()
{ {
return taskCount; return taskCount;
@ -1376,6 +1434,11 @@ public class ParallelMergeCombiningSequence<T> extends YieldingSequenceBase<T>
{ {
return totalCpuTimeNanos; return totalCpuTimeNanos;
} }
long getPartitionInitializedtime()
{
return partitionInitializedtime;
}
} }
private static <T> void closeAllCursors(final Collection<BatchedResultsCursor<T>> cursors) private static <T> void closeAllCursors(final Collection<BatchedResultsCursor<T>> cursors)

View File

@ -352,6 +352,27 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
return this; return this;
} }
@Override
public QueryMetrics<QueryType> reportParallelMergeTotalTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeFastestPartitionTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override
public QueryMetrics<QueryType> reportParallelMergeSlowestPartitionTime(long timeNs)
{
// Don't emit by default.
return this;
}
@Override @Override
public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount) public QueryMetrics<QueryType> reportQueriedSegmentCount(long segmentCount)
{ {

View File

@ -396,6 +396,30 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/ */
QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs); QueryMetrics<QueryType> reportParallelMergeTotalCpuTime(long timeNs);
/**
* Reports broker total "wall" time in nanoseconds from parallel merge start sequence creation to total
* consumption.
*/
QueryMetrics<QueryType> reportParallelMergeTotalTime(long timeNs);
/**
* Reports broker "wall" time in nanoseconds for the fastest parallel merge sequence partition to be 'initialized',
* where 'initialized' is time to the first result batch is populated from data servers and merging can begin.
*
* Similar to query 'time to first byte' metrics, except is a composite of the whole group of data servers which are
* present in the merge partition, which all must supply an initial result batch before merging can actually begin.
*/
QueryMetrics<QueryType> reportParallelMergeFastestPartitionTime(long timeNs);
/**
* Reports broker "wall" time in nanoseconds for the slowest parallel merge sequence partition to be 'initialized',
* where 'initialized' is time to the first result batch is populated from data servers and merging can begin.
*
* Similar to query 'time to first byte' metrics, except is a composite of the whole group of data servers which are
* present in the merge partition, which all must supply an initial result batch before merging can actually begin.
*/
QueryMetrics<QueryType> reportParallelMergeSlowestPartitionTime(long timeNs);
/** /**
* Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object. * Emits all metrics, registered since the last {@code emit()} call on this QueryMetrics object.
*/ */

View File

@ -303,6 +303,24 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs); return delegateQueryMetrics.reportParallelMergeTotalCpuTime(timeNs);
} }
@Override
public QueryMetrics reportParallelMergeTotalTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeTotalTime(timeNs);
}
@Override
public QueryMetrics reportParallelMergeFastestPartitionTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeFastestPartitionTime(timeNs);
}
@Override
public QueryMetrics reportParallelMergeSlowestPartitionTime(long timeNs)
{
return delegateQueryMetrics.reportParallelMergeSlowestPartitionTime(timeNs);
}
@Override @Override
public QueryMetrics reportQueriedSegmentCount(long segmentCount) public QueryMetrics reportQueriedSegmentCount(long segmentCount)
{ {

View File

@ -403,12 +403,17 @@ public class CachingClusteredClient implements QuerySegmentWalker
QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics(); QueryMetrics<?> queryMetrics = queryPlus.getQueryMetrics();
if (queryMetrics != null) { if (queryMetrics != null) {
queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism()); queryMetrics.parallelMergeParallelism(reportMetrics.getParallelism());
queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()); queryMetrics.reportParallelMergeParallelism(reportMetrics.getParallelism()).emit(emitter);
queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()); queryMetrics.reportParallelMergeInputSequences(reportMetrics.getInputSequences()).emit(emitter);
queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()); queryMetrics.reportParallelMergeInputRows(reportMetrics.getInputRows()).emit(emitter);
queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()); queryMetrics.reportParallelMergeOutputRows(reportMetrics.getOutputRows()).emit(emitter);
queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()); queryMetrics.reportParallelMergeTaskCount(reportMetrics.getTaskCount()).emit(emitter);
queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()); queryMetrics.reportParallelMergeTotalCpuTime(reportMetrics.getTotalCpuTime()).emit(emitter);
queryMetrics.reportParallelMergeTotalTime(reportMetrics.getTotalTime()).emit(emitter);
queryMetrics.reportParallelMergeSlowestPartitionTime(reportMetrics.getSlowestPartitionInitializedTime())
.emit(emitter);
queryMetrics.reportParallelMergeFastestPartitionTime(reportMetrics.getFastestPartitionInitializedTime())
.emit(emitter);
} }
} }
); );
@ -884,7 +889,6 @@ public class CachingClusteredClient implements QuerySegmentWalker
return null; return null;
} }
return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk); return new PartitionChunkEntry<>(spec.getInterval(), spec.getVersion(), chunk);
} }
} }
} }