mirror of https://github.com/apache/druid.git
Add metrics to SegmentMetadataCache refresh (#14453)
New metrics: - `segment/metadatacache/refresh/time`: time taken to refresh segments per datasource - `segment/metadatacache/refresh/count`: number of segments being refreshed per datasource
This commit is contained in:
parent
b6d6e3b827
commit
155fde33ff
|
@ -66,6 +66,9 @@ Metrics may have additional dimensions beyond those listed above.
|
|||
|`sqlQuery/bytes`|Number of bytes returned in the SQL query response.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`| |
|
||||
|`init/serverview/time`|Time taken to initialize the broker server view. Useful to detect if brokers are taking too long to start.||Depends on the number of segments.|
|
||||
|`init/metadatacache/time`|Time taken to initialize the broker segment metadata cache. Useful to detect if brokers are taking too long to start||Depends on the number of segments.|
|
||||
|`segment/metadatacache/refresh/count`|Number of segments to refresh in broker segment metadata cache.|`dataSource`|
|
||||
|`segment/metadatacache/refresh/time`|Time taken to refresh segments in broker segment metadata cache.|`dataSource`|
|
||||
|
||||
|
||||
### Historical
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Interner;
|
||||
|
@ -48,6 +49,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
|
|||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
import org.apache.druid.query.DruidMetrics;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.TableDataSource;
|
||||
|
@ -88,6 +90,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
@ -707,6 +710,8 @@ public class SegmentMetadataCache
|
|||
private Set<SegmentId> refreshSegmentsForDataSource(final String dataSource, final Set<SegmentId> segments)
|
||||
throws IOException
|
||||
{
|
||||
final Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
|
||||
if (!segments.stream().allMatch(segmentId -> segmentId.getDataSource().equals(dataSource))) {
|
||||
// Sanity check. We definitely expect this to pass.
|
||||
throw new ISE("'segments' must all match 'dataSource'!");
|
||||
|
@ -714,7 +719,10 @@ public class SegmentMetadataCache
|
|||
|
||||
log.debug("Refreshing metadata for dataSource[%s].", dataSource);
|
||||
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final ServiceMetricEvent.Builder builder =
|
||||
new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource);
|
||||
|
||||
emitter.emit(builder.build("segment/metadatacache/refresh/count", segments.size()));
|
||||
|
||||
// Segment id string -> SegmentId object.
|
||||
final Map<String, SegmentId> segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString);
|
||||
|
@ -783,10 +791,14 @@ public class SegmentMetadataCache
|
|||
yielder.close();
|
||||
}
|
||||
|
||||
long refreshDurationMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
|
||||
|
||||
emitter.emit(builder.build("segment/metadatacache/refresh/time", refreshDurationMillis));
|
||||
|
||||
log.debug(
|
||||
"Refreshed metadata for dataSource [%s] in %,d ms (%d segments queried, %d segments left).",
|
||||
dataSource,
|
||||
System.currentTimeMillis() - startTime,
|
||||
refreshDurationMillis,
|
||||
retVal.size(),
|
||||
segments.size() - retVal.size()
|
||||
);
|
||||
|
|
|
@ -39,6 +39,8 @@ import org.apache.druid.data.input.impl.TimestampSpec;
|
|||
import org.apache.druid.java.util.common.Intervals;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.java.util.common.guava.Sequences;
|
||||
import org.apache.druid.java.util.metrics.StubServiceEmitter;
|
||||
import org.apache.druid.query.DruidMetrics;
|
||||
import org.apache.druid.query.GlobalTableDataSource;
|
||||
import org.apache.druid.query.QueryContexts;
|
||||
import org.apache.druid.query.TableDataSource;
|
||||
|
@ -1447,6 +1449,52 @@ public class SegmentMetadataCacheTest extends SegmentMetadataCacheCommon
|
|||
Assert.assertNull(schema.getDatasource("wat"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshShouldEmitMetrics() throws InterruptedException, IOException
|
||||
{
|
||||
String datasource = "xyz";
|
||||
CountDownLatch addSegmentLatch = new CountDownLatch(2);
|
||||
StubServiceEmitter emitter = new StubServiceEmitter("broker", "host");
|
||||
SegmentMetadataCache schema = new SegmentMetadataCache(
|
||||
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
|
||||
serverView,
|
||||
segmentManager,
|
||||
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
|
||||
SEGMENT_CACHE_CONFIG_DEFAULT,
|
||||
new NoopEscalator(),
|
||||
new BrokerInternalQueryConfig(),
|
||||
emitter
|
||||
)
|
||||
{
|
||||
@Override
|
||||
protected void addSegment(final DruidServerMetadata server, final DataSegment segment)
|
||||
{
|
||||
super.addSegment(server, segment);
|
||||
if (datasource.equals(segment.getDataSource())) {
|
||||
addSegmentLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void removeSegment(final DataSegment segment)
|
||||
{
|
||||
super.removeSegment(segment);
|
||||
}
|
||||
};
|
||||
|
||||
List<DataSegment> segments = ImmutableList.of(
|
||||
newSegment(datasource, 1),
|
||||
newSegment(datasource, 2)
|
||||
);
|
||||
serverView.addSegment(segments.get(0), ServerType.HISTORICAL);
|
||||
serverView.addSegment(segments.get(1), ServerType.REALTIME);
|
||||
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
|
||||
schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), Sets.newHashSet(datasource));
|
||||
|
||||
emitter.verifyEmitted("segment/metadatacache/refresh/time", ImmutableMap.of(DruidMetrics.DATASOURCE, datasource), 1);
|
||||
emitter.verifyEmitted("segment/metadatacache/refresh/count", ImmutableMap.of(DruidMetrics.DATASOURCE, datasource), 1);
|
||||
}
|
||||
|
||||
private static DataSegment newSegment(String datasource, int partitionId)
|
||||
{
|
||||
return new DataSegment(
|
||||
|
|
Loading…
Reference in New Issue