From 0090430981a1adf43c3581704de0a796061ea0aa Mon Sep 17 00:00:00 2001 From: TessaIO Date: Sat, 27 Apr 2024 18:06:42 +0200 Subject: [PATCH] feat: add the number of merge buffers used for druid emitter Signed-off-by: TessaIO --- docs/operations/metrics.md | 3 ++ .../druid/collections/BlockingPool.java | 5 ++ .../collections/DefaultBlockingPool.java | 6 +++ .../druid/collections/DummyBlockingPool.java | 6 +++ .../MetricsEmittingMergingBlockingPool.java | 47 +++++++++++++++++++ .../druid/collections/BlockingPoolTest.java | 10 ++++ .../apache/druid/query/TestBufferPool.java | 6 +++ .../druid/guice/DruidProcessingModule.java | 12 +++-- 8 files changed, 91 insertions(+), 4 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index dc2ee952813..2ce573ee6ec 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -61,6 +61,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | +|`query/merge/buffersUsed`|Number of merge buffers used up to merge the results of group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies| @@ -102,6 +103,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`query/merge/buffersUsed`|Number of merge buffers used up to merge the results of group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| | |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| @@ -119,6 +121,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring |`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| +|`query/merge/buffersUsed`|Number of merge buffers used up to merge the results of group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| |`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.|| ### Jetty diff --git a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java index 4fb3ff66d8b..2ea9e5a5244 100644 --- a/processing/src/main/java/org/apache/druid/collections/BlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/BlockingPool.java @@ -49,4 +49,9 @@ public interface BlockingPool * @return count of pending requests */ long getPendingRequests(); + + /** + * @return number of buffers used/polled from the pool at that time. + */ + int getUsedBufferCount(); } diff --git a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java index e41a9e5d75d..ef81ecb2295 100644 --- a/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java @@ -212,4 +212,10 @@ public class DefaultBlockingPool implements BlockingPool lock.unlock(); } } + + @Override + public int getUsedBufferCount() + { + return maxSize - objects.size(); + } } diff --git a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java index 2553f9ab425..2d70d8cadd1 100644 --- a/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java +++ b/processing/src/main/java/org/apache/druid/collections/DummyBlockingPool.java @@ -61,4 +61,10 @@ public final class DummyBlockingPool implements BlockingPool { return 0; } + + @Override + public int getUsedBufferCount() + { + return 0; + } } diff --git a/processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java b/processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java new file mode 100644 index 00000000000..10a89a48e6f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/MetricsEmittingMergingBlockingPool.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query; + +import com.google.common.base.Supplier; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +public class MetricsEmittingMergingBlockingPool extends DefaultBlockingPool + implements ExecutorServiceMonitor.MetricEmitter +{ + + public MetricsEmittingMergingBlockingPool( + Supplier generator, + int limit, + ExecutorServiceMonitor executorServiceMonitor + ) + { + super(generator, limit); + executorServiceMonitor.add(this); + } + + @Override + public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder) + { + emitter.emit(metricBuilder.setMetric("query/merge/buffersUsed", getUsedBufferCount())); + emitter.emit(metricBuilder.setMetric("query/merge/totalBuffers", maxSize())); + } +} diff --git a/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java b/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java index cc5b82ba26e..3b934bef17f 100644 --- a/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java +++ b/processing/src/test/java/org/apache/druid/collections/BlockingPoolTest.java @@ -292,4 +292,14 @@ public class BlockingPoolTest r2.forEach(ReferenceCountingResourceHolder::close); Assert.assertEquals(pool.maxSize(), pool.getPoolSize()); } + + @Test(timeout = 60_000L) + public void testGetUsedBufferCount() + { + final List> holder = pool.takeBatch(6, 100L); + Assert.assertNotNull(holder); + Assert.assertEquals(6, pool.getUsedBufferCount()); + holder.forEach(ReferenceCountingResourceHolder::close); + Assert.assertEquals(0, pool.getUsedBufferCount()); + } } diff --git a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java index a650437f83f..cf53fd49ddd 100644 --- a/processing/src/test/java/org/apache/druid/query/TestBufferPool.java +++ b/processing/src/test/java/org/apache/druid/query/TestBufferPool.java @@ -147,4 +147,10 @@ public class TestBufferPool implements NonBlockingPool, BlockingPool { return takenFromMap.values(); } + + @Override + public int getUsedBufferCount() + { + return takenFromMap.size(); + } } diff --git a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java index 4879b5cd3c7..2b14d58b60b 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java @@ -31,7 +31,6 @@ import org.apache.druid.client.cache.CachePopulator; import org.apache.druid.client.cache.CachePopulatorStats; import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.collections.BlockingPool; -import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.StupidPool; import org.apache.druid.guice.annotations.Global; @@ -43,6 +42,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.offheap.OffheapBufferGenerator; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ExecutorServiceMonitor; +import org.apache.druid.query.MetricsEmittingMergingBlockingPool; import org.apache.druid.query.MetricsEmittingQueryProcessingPool; import org.apache.druid.query.PrioritizedExecutorService; import org.apache.druid.query.QueryProcessingPool; @@ -172,12 +172,16 @@ public class DruidProcessingModule implements Module ); } - public static BlockingPool createMergeBufferPool(final DruidProcessingConfig config) + public static BlockingPool createMergeBufferPool( + final DruidProcessingConfig config, + ExecutorServiceMonitor executorServiceMonitor + ) { verifyDirectMemory(config); - return new DefaultBlockingPool<>( + return new MetricsEmittingMergingBlockingPool<>( new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()), - config.getNumMergeBuffers() + config.getNumMergeBuffers(), + executorServiceMonitor ); }