Restore Sink Metric Emission Behaviour: Emit them per-Sink instead of per-FireHydrant (#17170)

* Emit aggregate segment processing metrics per sink instead of firehydrant

* add docs

* minor change

* checkstyle

* Fix DefaultQueryMetricsTest

* Minor changes in SinkMetricsEmittingQueryRunner

* spotbugs

* Address review comments

* Use ImmutableSet and ImmutableMap

* Create a helper class for saving state of StubServiceEmitter

* Add SinkQuerySegmentWalkerBenchmark

* Create SegmentMetrics class for tracking segment metrics

---------

Co-authored-by: Akshat Jain <akjn11@gmail.com>
This commit is contained in:
Rishabh Singh 2024-12-18 14:17:14 +05:30 committed by GitHub
parent 9ff11731c8
commit d5eb94d0e0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 523 additions and 68 deletions

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.benchmark;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.LoggingEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.Result;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
import org.apache.druid.segment.realtime.sink.Committers;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class SinkQuerySegmentWalkerBenchmark
{
static {
NullHandling.initializeForTests();
}
@Param({"10", "50", "100", "200"})
private int numFireHydrants;
private final LoggingEmitter loggingEmitter = new LoggingEmitter(new Logger(LoggingEmitter.class), LoggingEmitter.Level.INFO, new DefaultObjectMapper());
private final ServiceEmitter serviceEmitter = new ServiceEmitter("test", "test", loggingEmitter);
private File cacheDir;
private Appenderator appenderator;
@Setup(Level.Trial)
public void setup() throws Exception
{
final String userConfiguredCacheDir = System.getProperty("druid.benchmark.cacheDir", System.getenv("DRUID_BENCHMARK_CACHE_DIR"));
cacheDir = new File(userConfiguredCacheDir);
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(1)
.basePersistDirectory(cacheDir)
.withServiceEmitter(serviceEmitter)
.build();
appenderator = tester.getAppenderator();
appenderator.startJob();
final SegmentIdWithShardSpec segmentIdWithShardSpec = new SegmentIdWithShardSpec(
StreamAppenderatorTester.DATASOURCE,
Intervals.of("2000/2001"),
"A",
new LinearShardSpec(0)
);
for (int i = 0; i < numFireHydrants; i++) {
final MapBasedInputRow inputRow = new MapBasedInputRow(
DateTimes.of("2000").getMillis(),
ImmutableList.of("dim"),
ImmutableMap.of(
"dim",
"bar_" + i,
"met",
1
)
);
appenderator.add(segmentIdWithShardSpec, inputRow, Suppliers.ofInstance(Committers.nil()));
}
}
@TearDown(Level.Trial)
public void tearDown() throws Exception
{
appenderator.close();
FileUtils.deleteDirectory(cacheDir);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void emitSinkMetrics(Blackhole blackhole) throws Exception
{
{
final TimeseriesQuery query1 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2001")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results =
QueryPlus.wrap(query1).run(appenderator, ResponseContext.createEmpty()).toList();
blackhole.consume(results);
serviceEmitter.flush();
}
}
}

View File

@ -40,6 +40,10 @@ import java.util.stream.Collectors;
*/
public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
{
public static final String QUERY_WAIT_TIME = "query/wait/time";
public static final String QUERY_SEGMENT_TIME = "query/segment/time";
public static final String QUERY_SEGMENT_AND_CACHE_TIME = "query/segmentAndCache/time";
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();
@ -235,19 +239,19 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
@Override
public QueryMetrics<QueryType> reportWaitTime(long timeNs)
{
return reportMillisTimeMetric("query/wait/time", timeNs);
return reportMillisTimeMetric(QUERY_WAIT_TIME, timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentTime(long timeNs)
{
return reportMillisTimeMetric("query/segment/time", timeNs);
return reportMillisTimeMetric(QUERY_SEGMENT_TIME, timeNs);
}
@Override
public QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs)
{
return reportMillisTimeMetric("query/segmentAndCache/time", timeNs);
return reportMillisTimeMetric(QUERY_SEGMENT_AND_CACHE_TIME, timeNs);
}
@Override

View File

@ -38,7 +38,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
{
private final List<Event> events = new ArrayList<>();
private final List<AlertEvent> alertEvents = new ArrayList<>();
private final ConcurrentHashMap<String, List<ServiceMetricEvent>> metricEvents = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, List<ServiceMetricEventSnapshot>> metricEvents = new ConcurrentHashMap<>();
public StubServiceEmitter()
{
@ -56,7 +56,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
if (event instanceof ServiceMetricEvent) {
ServiceMetricEvent metricEvent = (ServiceMetricEvent) event;
metricEvents.computeIfAbsent(metricEvent.getMetric(), name -> new ArrayList<>())
.add(metricEvent);
.add(new ServiceMetricEventSnapshot(metricEvent));
} else if (event instanceof AlertEvent) {
alertEvents.add((AlertEvent) event);
}
@ -76,7 +76,7 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
*
* @return Map from metric name to list of events emitted for that metric.
*/
public Map<String, List<ServiceMetricEvent>> getMetricEvents()
public Map<String, List<ServiceMetricEventSnapshot>> getMetricEvents()
{
return metricEvents;
}
@ -96,18 +96,18 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
)
{
final List<Number> values = new ArrayList<>();
final List<ServiceMetricEvent> events =
final List<ServiceMetricEventSnapshot> events =
metricEvents.getOrDefault(metricName, Collections.emptyList());
final Map<String, Object> filters =
dimensionFilters == null ? Collections.emptyMap() : dimensionFilters;
for (ServiceMetricEvent event : events) {
for (ServiceMetricEventSnapshot event : events) {
final Map<String, Object> userDims = event.getUserDims();
boolean match = filters.keySet().stream()
.map(d -> filters.get(d).equals(userDims.get(d)))
.reduce((a, b) -> a && b)
.orElse(true);
if (match) {
values.add(event.getValue());
values.add(event.getMetricEvent().getValue());
}
}
@ -131,4 +131,32 @@ public class StubServiceEmitter extends ServiceEmitter implements MetricsVerifie
public void close()
{
}
/**
* Helper class to encapsulate a ServiceMetricEvent and its user dimensions.
* Since {@link StubServiceEmitter} doesn't actually emit metrics and saves the emitted metrics in-memory,
* this helper class saves a copy of {@link ServiceMetricEvent#userDims} of emitted metrics
* via {@link ServiceMetricEvent#getUserDims()} as it can get mutated.
*/
public static class ServiceMetricEventSnapshot
{
private final ServiceMetricEvent metricEvent;
private final Map<String, Object> userDims;
public ServiceMetricEventSnapshot(ServiceMetricEvent metricEvent)
{
this.metricEvent = metricEvent;
this.userDims = metricEvent.getUserDims();
}
public ServiceMetricEvent getMetricEvent()
{
return metricEvent;
}
public Map<String, Object> getUserDims()
{
return userDims;
}
}
}

View File

@ -22,6 +22,8 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.client.CachingQueryRunner;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
@ -30,18 +32,23 @@ import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.BySegmentQueryRunner;
import org.apache.druid.query.CPUTimeMetricQueryRunner;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultQueryMetrics;
import org.apache.druid.query.DirectQueryProcessingPool;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.MetricsEmittingQueryRunner;
import org.apache.druid.query.NoopQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
@ -52,6 +59,7 @@ import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.SinkQueryRunners;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.druid.query.spec.SpecificSegmentSpec;
@ -69,6 +77,7 @@ import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.utils.CloseableUtils;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -76,8 +85,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;
import java.util.stream.Collectors;
/**
@ -88,6 +100,19 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
private static final EmittingLogger log = new EmittingLogger(SinkQuerySegmentWalker.class);
private static final String CONTEXT_SKIP_INCREMENTAL_SEGMENT = "skipIncrementalSegment";
private static final Set<String> SEGMENT_QUERY_METRIC = ImmutableSet.of(DefaultQueryMetrics.QUERY_SEGMENT_TIME);
private static final Set<String> SEGMENT_CACHE_AND_WAIT_METRICS = ImmutableSet.of(
DefaultQueryMetrics.QUERY_WAIT_TIME,
DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME
);
private static final Map<String, ObjLongConsumer<? super QueryMetrics<?>>> METRICS_TO_REPORT =
ImmutableMap.of(
DefaultQueryMetrics.QUERY_SEGMENT_TIME, QueryMetrics::reportSegmentTime,
DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME, QueryMetrics::reportSegmentAndCacheTime,
DefaultQueryMetrics.QUERY_WAIT_TIME, QueryMetrics::reportWaitTime
);
private final String dataSource;
// Maintain a timeline of ids and Sinks for all the segments including the base and upgraded versions
@ -193,6 +218,7 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
final List<SinkSegmentReference> allSegmentReferences = new ArrayList<>();
final Map<SegmentDescriptor, SegmentId> segmentIdMap = new HashMap<>();
final LinkedHashMap<SegmentDescriptor, List<QueryRunner<T>>> allRunners = new LinkedHashMap<>();
final ConcurrentHashMap<String, SinkMetricsEmittingQueryRunner.SegmentMetrics> segmentMetricsAccumulator = new ConcurrentHashMap<>();
try {
for (final SegmentDescriptor descriptor : specs) {
@ -231,12 +257,13 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
descriptor,
sinkSegmentReferences.stream().map(
segmentReference -> {
QueryRunner<T> runner = new MetricsEmittingQueryRunner<>(
QueryRunner<T> runner = new SinkMetricsEmittingQueryRunner<>(
emitter,
factory.getToolchest(),
factory.createRunner(segmentReference.getSegment()),
QueryMetrics::reportSegmentTime,
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
segmentMetricsAccumulator,
SEGMENT_QUERY_METRIC,
sinkSegmentId.toString()
);
// 1) Only use caching if data is immutable
@ -273,13 +300,14 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the
// *possible* caching.
runner = new MetricsEmittingQueryRunner<>(
runner = new SinkMetricsEmittingQueryRunner<>(
emitter,
factory.getToolchest(),
runner,
QueryMetrics::reportSegmentAndCacheTime,
queryMetrics -> queryMetrics.segment(sinkSegmentId.toString())
).withWaitMeasuredFromNow();
segmentMetricsAccumulator,
SEGMENT_CACHE_AND_WAIT_METRICS,
sinkSegmentId.toString()
);
// Emit CPU time metrics.
runner = CPUTimeMetricQueryRunner.safeBuild(
@ -344,7 +372,17 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
return new ResourceIdPopulatingQueryRunner<>(
QueryRunnerHelper.makeClosingQueryRunner(
CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner, true), toolChest),
new SinkMetricsEmittingQueryRunner<>(
emitter,
toolChest,
new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(mergedRunner, true),
toolChest
),
segmentMetricsAccumulator,
Collections.emptySet(),
null
),
toolChest,
emitter,
cpuTimeAccumulator,
@ -415,7 +453,153 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker
// with subsegments (hydrants).
return segmentId + "_H" + hydrantNumber;
}
/**
* This class is responsible for emitting query/segment/time, query/wait/time and query/segmentAndCache/Time metrics for a Sink.
* It accumulates query/segment/time and query/segmentAndCache/time metric for each FireHydrant at the level of Sink.
* query/wait/time metric is the time taken to process the first FireHydrant for the Sink.
* <p>
* This class operates in two distinct modes based on whether {@link SinkMetricsEmittingQueryRunner#segmentId} is null or non-null.
* When segmentId is non-null, it accumulates the metrics. When segmentId is null, it emits the accumulated metrics.
* <p>
* This class is derived from {@link org.apache.druid.query.MetricsEmittingQueryRunner}.
*/
private static class SinkMetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private final ServiceEmitter emitter;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final QueryRunner<T> queryRunner;
private final ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator;
private final Set<String> metricsToCompute;
@Nullable
private final String segmentId;
private final long creationTimeNs;
private SinkMetricsEmittingQueryRunner(
ServiceEmitter emitter,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
ConcurrentHashMap<String, SegmentMetrics> segmentMetricsAccumulator,
Set<String> metricsToCompute,
@Nullable String segmentId
)
{
this.emitter = emitter;
this.queryToolChest = queryToolChest;
this.queryRunner = queryRunner;
this.segmentMetricsAccumulator = segmentMetricsAccumulator;
this.metricsToCompute = metricsToCompute;
this.segmentId = segmentId;
this.creationTimeNs = System.nanoTime();
}
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext responseContext)
{
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
return Sequences.wrap(
// Use LazySequence because we want to account execution time of queryRunner.run() (it prepares the underlying
// Sequence) as part of the reported query time, i.e. we want to execute queryRunner.run() after
// `startTimeNs = System.nanoTime();`
new LazySequence<>(() -> queryRunner.run(queryWithMetrics, responseContext)),
new SequenceWrapper()
{
private long startTimeNs;
@Override
public void before()
{
startTimeNs = System.nanoTime();
}
@Override
public void after(boolean isDone, Throwable thrown)
{
if (segmentId != null) {
// accumulate metrics
final SegmentMetrics metrics = segmentMetricsAccumulator.computeIfAbsent(segmentId, id -> new SegmentMetrics());
if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_WAIT_TIME)) {
metrics.setWaitTime(startTimeNs - creationTimeNs);
}
if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_TIME)) {
metrics.addSegmentTime(System.nanoTime() - startTimeNs);
}
if (metricsToCompute.contains(DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME)) {
metrics.addSegmentAndCacheTime(System.nanoTime() - startTimeNs);
}
} else {
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();
// report accumulated metrics
for (Map.Entry<String, SegmentMetrics> segmentAndMetrics : segmentMetricsAccumulator.entrySet()) {
queryMetrics.segment(segmentAndMetrics.getKey());
for (Map.Entry<String, ObjLongConsumer<? super QueryMetrics<?>>> reportMetric : METRICS_TO_REPORT.entrySet()) {
final String metricName = reportMetric.getKey();
switch (metricName) {
case DefaultQueryMetrics.QUERY_SEGMENT_TIME:
reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentTime());
case DefaultQueryMetrics.QUERY_WAIT_TIME:
reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getWaitTime());
case DefaultQueryMetrics.QUERY_SEGMENT_AND_CACHE_TIME:
reportMetric.getValue().accept(queryMetrics, segmentAndMetrics.getValue().getSegmentAndCacheTime());
}
}
try {
queryMetrics.emit(emitter);
}
catch (Exception e) {
// Query should not fail, because of emitter failure. Swallowing the exception.
log.error(e, "Failed to emit metrics for segment[%s]", segmentAndMetrics.getKey());
}
}
}
}
}
);
}
/**
* Class to track segment related metrics during query execution.
*/
private static class SegmentMetrics
{
private final AtomicLong querySegmentTime = new AtomicLong(0);
private final AtomicLong queryWaitTime = new AtomicLong(0);
private final AtomicLong querySegmentAndCacheTime = new AtomicLong(0);
private void addSegmentTime(long time)
{
querySegmentTime.addAndGet(time);
}
private void setWaitTime(long time)
{
queryWaitTime.set(time);
}
private void addSegmentAndCacheTime(long time)
{
querySegmentAndCacheTime.addAndGet(time);
}
private long getSegmentTime()
{
return querySegmentTime.get();
}
private long getWaitTime()
{
return queryWaitTime.get();
}
private long getSegmentAndCacheTime()
{
return querySegmentAndCacheTime.get();
}
}
}
private static class SinkHolder implements Overshadowable<SinkHolder>
{
private final Sink sink;

View File

@ -34,6 +34,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Order;
@ -70,8 +71,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@ -1861,9 +1864,11 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
public void testQueryByIntervals() throws Exception
{
try (
final StubServiceEmitter serviceEmitter = new StubServiceEmitter();
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.withServiceEmitter(serviceEmitter)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -1902,36 +1907,18 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
results1
);
// Query2: 2000/2002
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(ImmutableList.of(Intervals.of("2000/2002")))
.aggregators(
Arrays.asList(
new LongSumAggregatorFactory("count", "count"),
new LongSumAggregatorFactory("met", "met")
)
)
.granularity(Granularities.DAY)
.build();
final List<Result<TimeseriesResultValue>> results2 =
QueryPlus.wrap(query2).run(appenderator, ResponseContext.createEmpty()).toList();
Assert.assertEquals(
"query2",
ImmutableList.of(
new Result<>(
DateTimes.of("2000"),
new TimeseriesResultValue(ImmutableMap.of("count", 3L, "met", 7L))
),
new Result<>(
DateTimes.of("2001"),
new TimeseriesResultValue(ImmutableMap.of("count", 4L, "met", 120L))
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Arrays.asList(
IDENTIFIERS.get(0).asSegmentId().toString(),
IDENTIFIERS.get(1).asSegmentId().toString()
)
),
results2
)
);
serviceEmitter.flush();
// Query3: 2000/2001T01
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
@ -1961,6 +1948,19 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
results3
);
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Arrays.asList(
IDENTIFIERS.get(0).asSegmentId().toString(),
IDENTIFIERS.get(1).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString()
)
)
);
serviceEmitter.flush();
// Query4: 2000/2001T01, 2001T03/2001T04
final TimeseriesQuery query4 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
@ -1994,6 +1994,16 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
),
results4
);
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Arrays.asList(
IDENTIFIERS.get(0).asSegmentId().toString(),
IDENTIFIERS.get(1).asSegmentId().toString(),
IDENTIFIERS.get(2).asSegmentId().toString()
)
)
);
}
}
@ -2001,9 +2011,11 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
public void testQueryBySegments() throws Exception
{
try (
StubServiceEmitter serviceEmitter = new StubServiceEmitter();
final StreamAppenderatorTester tester =
new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
.basePersistDirectory(temporaryFolder.newFolder())
.withServiceEmitter(serviceEmitter)
.build()) {
final Appenderator appenderator = tester.getAppenderator();
@ -2052,6 +2064,17 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
results1
);
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Collections.singletonList(
IDENTIFIERS.get(2).asSegmentId().toString()
)
)
);
serviceEmitter.flush();
// Query2: segment #2, partial
final TimeseriesQuery query2 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
@ -2088,6 +2111,17 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
results2
);
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Collections.singletonList(
IDENTIFIERS.get(2).asSegmentId().toString()
)
)
);
serviceEmitter.flush();
// Query3: segment #2, two disjoint intervals
final TimeseriesQuery query3 = Druids.newTimeseriesQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
@ -2129,6 +2163,17 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
results3
);
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Collections.singletonList(
IDENTIFIERS.get(2).asSegmentId().toString()
)
)
);
serviceEmitter.flush();
final ScanQuery query4 = Druids.newScanQueryBuilder()
.dataSource(StreamAppenderatorTester.DATASOURCE)
.intervals(
@ -2164,6 +2209,33 @@ public class StreamAppenderatorTest extends InitializedNullHandlingTest
new Object[]{DateTimes.of("2001T03").getMillis(), "foo", 1L, 64L},
((List<Object>) ((List<Object>) results4.get(1).getEvents()).get(0)).toArray()
);
verifySinkMetrics(
serviceEmitter,
new HashSet<>(
Collections.singletonList(
IDENTIFIERS.get(2).asSegmentId().toString()
)
)
);
serviceEmitter.flush();
}
}
private void verifySinkMetrics(StubServiceEmitter emitter, Set<String> segmentIds)
{
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> events = emitter.getMetricEvents();
int segments = segmentIds.size();
Assert.assertEquals(4, events.size());
Assert.assertTrue(events.containsKey("query/cpu/time"));
Assert.assertEquals(segments, events.get("query/segment/time").size());
Assert.assertEquals(segments, events.get("query/segmentAndCache/time").size());
Assert.assertEquals(segments, events.get("query/wait/time").size());
for (String id : segmentIds) {
Assert.assertTrue(events.get("query/segment/time").stream().anyMatch(value -> value.getUserDims().containsValue(id)));
Assert.assertTrue(events.get("query/segmentAndCache/time").stream().anyMatch(value -> value.getUserDims().containsValue(id)));
Assert.assertTrue(events.get("query/wait/time").stream().anyMatch(value -> value.getUserDims().containsValue(id)));
}
}

View File

@ -109,7 +109,8 @@ public class StreamAppenderatorTester implements AutoCloseable
final RowIngestionMeters rowIngestionMeters,
final boolean skipBytesInMemoryOverheadCheck,
final DataSegmentAnnouncer announcer,
final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig
final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig,
final ServiceEmitter serviceEmitter
)
{
objectMapper = new DefaultObjectMapper();
@ -145,18 +146,18 @@ public class StreamAppenderatorTester implements AutoCloseable
.withObjectMapper(objectMapper)
.build();
tuningConfig = new TestAppenderatorConfig(
TuningConfig.DEFAULT_APPENDABLE_INDEX,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
skipBytesInMemoryOverheadCheck,
IndexSpec.DEFAULT,
0,
false,
0L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
basePersistDirectory
);
TuningConfig.DEFAULT_APPENDABLE_INDEX,
maxRowsInMemory,
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
skipBytesInMemoryOverheadCheck,
IndexSpec.DEFAULT,
0,
false,
0L,
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE,
basePersistDirectory
);
metrics = new SegmentGenerationMetrics();
queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
@ -174,11 +175,12 @@ public class StreamAppenderatorTester implements AutoCloseable
OffHeapMemorySegmentWriteOutMediumFactory.instance()
);
emitter = new ServiceEmitter(
emitter = serviceEmitter == null ? new ServiceEmitter(
"test",
"test",
new NoopEmitter()
);
) : serviceEmitter;
emitter.start();
EmittingLogger.registerEmitter(emitter);
dataSegmentPusher = new DataSegmentPusher()
@ -350,6 +352,7 @@ public class StreamAppenderatorTester implements AutoCloseable
private RowIngestionMeters rowIngestionMeters;
private boolean skipBytesInMemoryOverheadCheck;
private int delayInMilli = 0;
private ServiceEmitter serviceEmitter;
public Builder maxRowsInMemory(final int maxRowsInMemory)
{
@ -393,6 +396,12 @@ public class StreamAppenderatorTester implements AutoCloseable
return this;
}
public Builder withServiceEmitter(ServiceEmitter serviceEmitter)
{
this.serviceEmitter = serviceEmitter;
return this;
}
public StreamAppenderatorTester build()
{
return new StreamAppenderatorTester(
@ -404,7 +413,8 @@ public class StreamAppenderatorTester implements AutoCloseable
rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters,
skipBytesInMemoryOverheadCheck,
new NoopDataSegmentAnnouncer(),
CentralizedDatasourceSchemaConfig.create()
CentralizedDatasourceSchemaConfig.create(),
serviceEmitter
);
}
@ -422,7 +432,8 @@ public class StreamAppenderatorTester implements AutoCloseable
rowIngestionMeters == null ? new SimpleRowIngestionMeters() : rowIngestionMeters,
skipBytesInMemoryOverheadCheck,
dataSegmentAnnouncer,
config
config,
serviceEmitter
);
}
}

View File

@ -91,14 +91,14 @@ public class SQLAuditManagerTest
final AuditEntry entry = createAuditEntry("testKey", "testType", DateTimes.nowUtc());
auditManager.doAudit(entry);
Map<String, List<ServiceMetricEvent>> metricEvents = serviceEmitter.getMetricEvents();
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());
List<ServiceMetricEvent> auditMetricEvents = metricEvents.get("config/audit");
List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());
ServiceMetricEvent metric = auditMetricEvents.get(0);
ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
final AuditEntry dbEntry = lookupAuditEntryForKey("testKey");
Assert.assertNotNull(dbEntry);
@ -120,14 +120,14 @@ public class SQLAuditManagerTest
Assert.assertEquals(entry, dbEntry);
// Verify emitted metrics
Map<String, List<ServiceMetricEvent>> metricEvents = serviceEmitter.getMetricEvents();
Map<String, List<StubServiceEmitter.ServiceMetricEventSnapshot>> metricEvents = serviceEmitter.getMetricEvents();
Assert.assertEquals(1, metricEvents.size());
List<ServiceMetricEvent> auditMetricEvents = metricEvents.get("config/audit");
List<StubServiceEmitter.ServiceMetricEventSnapshot> auditMetricEvents = metricEvents.get("config/audit");
Assert.assertNotNull(auditMetricEvents);
Assert.assertEquals(1, auditMetricEvents.size());
ServiceMetricEvent metric = auditMetricEvents.get(0);
ServiceMetricEvent metric = auditMetricEvents.get(0).getMetricEvent();
Assert.assertEquals(dbEntry.getKey(), metric.getUserDims().get("key"));
Assert.assertEquals(dbEntry.getType(), metric.getUserDims().get("type"));
Assert.assertNull(metric.getUserDims().get("payload"));