Emit production rate

This commit is contained in:
Arun Ramani 2024-11-19 17:16:56 +01:00
parent 1dbd005df6
commit 43145a4484
2 changed files with 83 additions and 0 deletions

View File

@ -60,6 +60,7 @@ import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.joda.time.DateTime;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
@ -96,6 +97,7 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
private final ServiceEmitter emitter;
private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
private final Pattern pattern;
private Map<KafkaTopicPartition, Long> previousLatestSequenceFromStream;
private volatile Map<KafkaTopicPartition, Long> latestSequenceFromStream;
@ -277,6 +279,29 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
return getRecordLagPerPartitionInLatestSequences(highestCurrentOffsets);
}
@Nullable
@Override
@SuppressWarnings("SSBasedInspection")
protected Map<KafkaTopicPartition, Long> getPartitionProductionRate()
{
Map<KafkaTopicPartition, Long> diff = calculateDiff(
latestSequenceFromStream,
previousLatestSequenceFromStream
);
previousLatestSequenceFromStream = latestSequenceFromStream
.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
Entry::getValue
)
);
return diff;
}
@Nullable
@Override
protected Map<KafkaTopicPartition, Long> getPartitionTimeLag()
@ -524,4 +549,23 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<KafkaTopicPartitio
return match ? new KafkaTopicPartition(isMultiTopic(), streamMatchValue, kafkaTopicPartition.partition()) : null;
}
@SuppressWarnings("SSBasedInspection")
private Map<KafkaTopicPartition, Long> calculateDiff(
@Nonnull Map<KafkaTopicPartition, Long> left,
@Nonnull Map<KafkaTopicPartition, Long> right
)
{
return left
.entrySet()
.stream()
.collect(
Collectors.toMap(
Entry::getKey,
e -> e.getValue() != null
? e.getValue() - Optional.ofNullable(right.get(e.getKey())).orElse(0L)
: 0
)
);
}
}

View File

@ -4126,6 +4126,12 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
@Nullable
protected abstract Map<PartitionIdType, Long> getPartitionTimeLag();
@Nullable
protected Map<PartitionIdType, Long> getPartitionProductionRate()
{
return null;
}
/**
* Gets highest current offsets of all the tasks (actively reading and publishing) for all partitions of the stream.
* In case if no task is reading for a partition, returns offset stored in metadata storage for that partition.
@ -4509,6 +4515,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
try {
Map<PartitionIdType, Long> partitionRecordLags = getPartitionRecordLag();
Map<PartitionIdType, Long> partitionTimeLags = getPartitionTimeLag();
Map<PartitionIdType, Long> partitionProductionRate = getPartitionProductionRate();
if (partitionRecordLags == null && partitionTimeLags == null) {
throw new ISE("Latest offsets have not been fetched");
@ -4573,9 +4580,41 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
};
if (productionRates == null) {
return;
}
Map<String, Object> metricTags = spec.getContextValue(DruidMetrics.TAGS);
for (Map.Entry<PartitionIdType, Long> entry : productionRates.entrySet()) {
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimension(DruidMetrics.PARTITION, entry.getKey())
.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
.setMetric(
StringUtils.format("ingest/%s/partitionProduction%s", type, suffix),
entry.getValue()
)
);
}
emitter.emit(
ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
.setDimensionIfNotNull(DruidMetrics.TAGS, metricTags)
.setMetric(
StringUtils.format("ingest/%s/production%s", type, suffix),
productionRates.values().stream().mapToLong(e -> e).sum()
)
);
};
// this should probably really be /count or /records or something.. but keeping like this for backwards compat
emitFn.accept(partitionRecordLags, "");
emitFn.accept(partitionTimeLags, "/time");
productionEmitFn.accept(partitionProductionRate, "");
}
catch (Exception e) {
log.warn(e, "Unable to compute lag");