Add logging for downsampling sketches in MSQ (#14580)

* Add more logs for downsampling sketches

* Fix builds

* Lower log level

* Add new log message
This commit is contained in:
Adarsh Sanjeev 2023-08-02 20:07:54 +05:30 committed by GitHub
parent 955734ba8d
commit 6837a7be19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 14 deletions

View File

@ -30,6 +30,8 @@ import org.apache.druid.frame.key.RowKey;
import org.apache.druid.frame.key.RowKeyReader;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.RowSignature;
import java.util.ArrayList;
@ -48,6 +50,7 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
// for an objectively faster and more accurate solution instead of finding the best match with the following parameters
private static final int MAX_COUNT_MAX_ITERATIONS = 500;
private static final double MAX_COUNT_ITERATION_GROWTH_FACTOR = 1.05;
private final Logger log = new Logger(ClusterByStatisticsCollectorImpl.class);
private final ClusterBy clusterBy;
private final RowKeyReader keyReader;
@ -127,6 +130,7 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
totalRetainedBytes += bucketHolder.updateRetainedBytes();
if (totalRetainedBytes > maxRetainedBytes) {
log.debug("Downsampling ClusterByStatisticsCollector as totalRetainedBytes[%s] is greater than maxRetainedBytes[%s]", totalRetainedBytes, maxRetainedBytes);
downSample();
}
@ -148,6 +152,7 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
totalRetainedBytes += bucketHolder.updateRetainedBytes();
if (totalRetainedBytes > maxRetainedBytes) {
log.debug("Downsampling ClusterByStatisticsCollector as totalRetainedBytes[%s] is greater than maxRetainedBytes[%s]", totalRetainedBytes, maxRetainedBytes);
downSample();
}
}
@ -179,6 +184,7 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
totalRetainedBytes += bucketHolder.updateRetainedBytes();
if (totalRetainedBytes > maxRetainedBytes) {
log.debug("Downsampling ClusterByStatisticsCollector as totalRetainedBytes[%s] is greater than maxRetainedBytes[%s]", totalRetainedBytes, maxRetainedBytes);
downSample();
}
}
@ -227,6 +233,8 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
@Override
public ClusterByPartitions generatePartitionsWithTargetWeight(final long targetWeight)
{
logSketches();
if (targetWeight < 1) {
throw new IAE("Target weight must be positive");
}
@ -270,6 +278,8 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
@Override
public ClusterByPartitions generatePartitionsWithMaxCount(final int maxNumPartitions)
{
logSketches();
if (maxNumPartitions < 1) {
throw new IAE("Must have at least one partition");
} else if (buckets.isEmpty()) {
@ -311,6 +321,28 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
return ranges;
}
private void logSketches()
{
if (log.isDebugEnabled()) {
// Log all sketches
List<KeyCollector<?>> keyCollectors = buckets.values()
.stream()
.map(bucketHolder -> bucketHolder.keyCollector)
.sorted(Comparator.comparingInt(KeyCollector::sketchAccuracyFactor))
.collect(Collectors.toList());
log.debug("KeyCollectors at partition generation: [%s]", keyCollectors);
} else {
// Log the 5 least accurate sketches
List<KeyCollector<?>> limitedKeyCollectors = buckets.values()
.stream()
.map(bucketHolder -> bucketHolder.keyCollector)
.sorted(Comparator.comparingInt(KeyCollector::sketchAccuracyFactor))
.limit(5)
.collect(Collectors.toList());
log.info("Most downsampled keyCollectors: [%s]", limitedKeyCollectors);
}
}
@Override
public ClusterByStatisticsSnapshot snapshot()
{
@ -380,32 +412,37 @@ public class ClusterByStatisticsCollectorImpl implements ClusterByStatisticsColl
long newTotalRetainedBytes = totalRetainedBytes;
final long targetTotalRetainedBytes = totalRetainedBytes / 2;
final List<BucketHolder> sortedHolders = new ArrayList<>(buckets.size());
final List<Pair<Long, BucketHolder>> sortedHolders = new ArrayList<>(buckets.size());
final RowKeyReader trimmedRowReader = keyReader.trimmedKeyReader(clusterBy.getBucketByCount());
// Only consider holders with more than one retained key. Holders with a single retained key cannot be downsampled.
for (final BucketHolder holder : buckets.values()) {
if (holder.keyCollector.estimatedRetainedKeys() > 1) {
sortedHolders.add(holder);
for (final Map.Entry<RowKey, BucketHolder> entry : buckets.entrySet()) {
BucketHolder bucketHolder = entry.getValue();
if (bucketHolder != null && bucketHolder.keyCollector.estimatedRetainedKeys() > 1) {
Long timeChunk = clusterBy.getBucketByCount() == 0 ? null : (Long) trimmedRowReader.read(entry.getKey(), 0);
sortedHolders.add(Pair.of(timeChunk, bucketHolder));
}
}
// Downsample least-dense buckets first. (They're less likely to need high resolution.)
sortedHolders.sort(
Comparator.comparing((BucketHolder holder) ->
(double) holder.keyCollector.estimatedTotalWeight()
/ holder.keyCollector.estimatedRetainedKeys())
Comparator.comparing((Pair<Long, BucketHolder> pair) ->
(double) pair.rhs.keyCollector.estimatedTotalWeight()
/ pair.rhs.keyCollector.estimatedRetainedKeys())
);
int i = 0;
while (i < sortedHolders.size() && newTotalRetainedBytes > targetTotalRetainedBytes) {
final BucketHolder bucketHolder = sortedHolders.get(i);
final Long timeChunk = sortedHolders.get(i).lhs;
final BucketHolder bucketHolder = sortedHolders.get(i).rhs;
// Ignore false return, because we wrap all collectors in DelegateOrMinKeyCollector and can be assured that
// it will downsample all the way to one if needed. Can't do better than that.
log.debug("Downsampling sketch for timeChunk [%s]: [%s]", timeChunk, bucketHolder.keyCollector);
bucketHolder.keyCollector.downSample();
newTotalRetainedBytes += bucketHolder.updateRetainedBytes();
if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 1).retainedBytes > bucketHolder.retainedBytes || bucketHolder.keyCollector.estimatedRetainedKeys() <= 1) {
if (i == sortedHolders.size() - 1 || sortedHolders.get(i + 1).rhs.retainedBytes > bucketHolder.retainedBytes || bucketHolder.keyCollector.estimatedRetainedKeys() <= 1) {
i++;
}
}

View File

@ -34,11 +34,11 @@ import java.util.Optional;
/**
* Delegates to some other kind of {@link KeyCollector} at first, until its {@link #downSample()} fails to downsample.
* At that point, the delegate collector is nulled out and this collector starts tracking the min key instead.
*
* <br>
* This is useful because it allows us to wrap any {@link KeyCollector} and enable downsampling to a single key, even
* if the original collector does not support that. For example, {@link QuantilesSketchKeyCollector} cannot downsample
* below K = 2, which retains more than one key.
*
* <br>
* Created by {@link DelegateOrMinKeyCollectorFactory}.
*/
public class DelegateOrMinKeyCollector<TDelegate extends KeyCollector<TDelegate>>
@ -177,4 +177,19 @@ public class DelegateOrMinKeyCollector<TDelegate extends KeyCollector<TDelegate>
return ClusterByPartitions.oneUniversalPartition();
}
}
@Override
public int sketchAccuracyFactor()
{
return delegate == null ? Integer.MIN_VALUE : delegate.sketchAccuracyFactor();
}
@Override
public String toString()
{
return "DelegateOrMinKeyCollector{" +
"delegate=" + delegate +
", minKey=" + minKey +
'}';
}
}

View File

@ -245,6 +245,12 @@ public class DistinctKeyCollector implements KeyCollector<DistinctKeyCollector>
return new ClusterByPartitions(partitions);
}
@Override
public int sketchAccuracyFactor()
{
return -spaceReductionFactor;
}
@JsonProperty("keys")
Map<RowKey, Long> getRetainedKeys()
{
@ -312,4 +318,15 @@ public class DistinctKeyCollector implements KeyCollector<DistinctKeyCollector>
return true;
}
@Override
public String toString()
{
return "DistinctKeyCollector{" +
"maxBytes=" + maxBytes +
", retainedBytes=" + retainedBytes +
", spaceReductionFactor=" + spaceReductionFactor +
", totalWeightUnadjusted=" + totalWeightUnadjusted +
'}';
}
}

View File

@ -26,7 +26,7 @@ public interface KeyCollector<CollectorType extends KeyCollector<CollectorType>>
{
/**
* Add a key with a certain weight to this collector.
*
* <br>
* See {@link ClusterByStatisticsCollector#add} for the meaning of "weight".
*/
void add(RowKey key, long weight);
@ -80,4 +80,12 @@ public interface KeyCollector<CollectorType extends KeyCollector<CollectorType>>
* or lower than the provided target.
*/
ClusterByPartitions generatePartitionsWithTargetWeight(long targetWeight);
/**
* Returns an integer which indicates the accuracy of the sketch used. The higher the value, the more accurate it is.
* This can be compared to check which sketches have been downsampled the most and are thus the least accurate. The
* exact value returned is decided by the implementation, and it is only meaningful to compare sketches of the same
* implementation in this way.
*/
int sketchAccuracyFactor();
}

View File

@ -37,7 +37,7 @@ import java.util.NoSuchElementException;
/**
* A key collector that is used when not aggregating. It uses a quantiles sketch to track keys.
*
* <br>
* The collector maintains the averageKeyLength for all keys added through {@link #add(RowKey, long)} or
* {@link #addAll(QuantilesSketchKeyCollector)}. The average is calculated as a running average and accounts for
* weight of the key added. The averageKeyLength is assumed to be unaffected by {@link #downSample()}.
@ -171,6 +171,12 @@ public class QuantilesSketchKeyCollector implements KeyCollector<QuantilesSketch
return new ClusterByPartitions(partitions);
}
@Override
public int sketchAccuracyFactor()
{
return sketch.getK();
}
/**
* Retrieves the backing sketch. Exists for usage by {@link QuantilesSketchKeyCollectorFactory}.
*/
@ -186,4 +192,15 @@ public class QuantilesSketchKeyCollector implements KeyCollector<QuantilesSketch
{
return averageKeyLength;
}
@Override
public String toString()
{
return "QuantilesSketchKeyCollector{" +
"sketch=ItemsSketch{N=" + sketch.getN() +
", K=" + sketch.getK() +
", retainedKeys=" + sketch.getNumRetained() +
"}, averageKeyLength=" + averageKeyLength +
'}';
}
}

View File

@ -81,7 +81,7 @@ export const RuleEditor = React.memo(function RuleEditor(props: RuleEditorProps)
if (!tieredReplicantsList.length) {
return (
<FormGroup>
There is no historical replication configured, data will not be loaded on hisotricals.
There is no historical replication configured, data will not be loaded on historicals.
</FormGroup>
);
}