Add logging for sketches on workers (#16697)

Improve the logging of sketches on workers.
This commit is contained in:
Adarsh Sanjeev 2024-07-09 14:37:43 +05:30 committed by GitHub
parent af5399cd9d
commit 7c625356c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 10 additions and 5 deletions

View File

@ -1772,6 +1772,7 @@ public class WorkerImpl implements Worker
@Override @Override
public void onSuccess(final ClusterByStatisticsCollector result) public void onSuccess(final ClusterByStatisticsCollector result)
{ {
result.logSketches();
kernelManipulationQueue.add( kernelManipulationQueue.add(
holder -> holder ->
holder.getStageKernelMap().get(stageDefinition.getId()) holder.getStageKernelMap().get(stageDefinition.getId())

View File

@ -100,6 +100,7 @@ public class GlobalSortMaxCountShuffleSpec implements GlobalSortShuffleSpec
} else if (maxPartitions > maxNumPartitions) { } else if (maxPartitions > maxNumPartitions) {
return Either.error((long) maxPartitions); return Either.error((long) maxPartitions);
} else { } else {
collector.logSketches();
final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithMaxCount(maxPartitions); final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithMaxCount(maxPartitions);
if (generatedPartitions.size() <= maxNumPartitions) { if (generatedPartitions.size() <= maxNumPartitions) {
return Either.value(generatedPartitions); return Either.value(generatedPartitions);

View File

@ -99,6 +99,7 @@ public class GlobalSortTargetSizeShuffleSpec implements GlobalSortShuffleSpec
if (expectedPartitions > maxNumPartitions) { if (expectedPartitions > maxNumPartitions) {
return Either.error(expectedPartitions); return Either.error(expectedPartitions);
} else { } else {
collector.logSketches();
final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithTargetWeight(targetSize); final ClusterByPartitions generatedPartitions = collector.generatePartitionsWithTargetWeight(targetSize);
if (generatedPartitions.size() <= maxNumPartitions) { if (generatedPartitions.size() <= maxNumPartitions) {
return Either.value(generatedPartitions); return Either.value(generatedPartitions);

View File

@ -90,6 +90,11 @@ public interface ClusterByStatisticsCollector
*/ */
ClusterByPartitions generatePartitionsWithMaxCount(int maxNumPartitions); ClusterByPartitions generatePartitionsWithMaxCount(int maxNumPartitions);
/**
* Logs some information regarding the collector. This is useful in seeing which sketches were downsampled the most.
*/
void logSketches();
/** /**
* Returns an immutable, JSON-serializable snapshot of this collector. * Returns an immutable, JSON-serializable snapshot of this collector.
*/ */

View File

@ -243,8 +243,6 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
@Override @Override
public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetWeight) public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetWeight)
{ {
logSketches();
if (targetWeight < 1) { if (targetWeight < 1) {
throw new IAE("Target weight must be positive"); throw new IAE("Target weight must be positive");
} }
@ -288,8 +286,6 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
@Override @Override
public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartitions) public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartitions)
{ {
logSketches();
if (maxNumPartitions < 1) { if (maxNumPartitions < 1) {
throw new IAE("Must have at least one partition"); throw new IAE("Must have at least one partition");
} else if (buckets.isEmpty()) { } else if (buckets.isEmpty()) {
@ -331,7 +327,8 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
return ranges; return ranges;
} }
private void logSketches() @Override
public void logSketches()
{ {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
// Log all sketches // Log all sketches