mirror of https://github.com/apache/druid.git
feat: add the number of merge buffers used for druid emitter
Signed-off-by: TessaIO <ahmedgrati1999@gmail.com>
This commit is contained in:
parent
5de84253d8
commit
0090430981
|
@ -61,6 +61,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|
|||
|`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`query/merge/buffersUsed`|Number of merge buffers used up to merge the results of group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies|
|
||||
|
@ -102,6 +103,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|
|||
|`query/success/count`|Number of queries successfully processed.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`query/merge/buffersUsed`|Number of merge buffers used up to merge the results of group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|
||||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|
||||
|
@ -119,6 +121,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|
|||
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`query/merge/buffersUsed`|Number of merge buffers used up to merge the results of group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|
||||
|
||||
### Jetty
|
||||
|
|
|
@ -49,4 +49,9 @@ public interface BlockingPool<T>
|
|||
* @return count of pending requests
|
||||
*/
|
||||
long getPendingRequests();
|
||||
|
||||
/**
|
||||
* @return number of buffers used/polled from the pool at that time.
|
||||
*/
|
||||
int getUsedBufferCount();
|
||||
}
|
||||
|
|
|
@ -212,4 +212,10 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
|
|||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUsedBufferCount()
|
||||
{
|
||||
return maxSize - objects.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,4 +61,10 @@ public final class DummyBlockingPool<T> implements BlockingPool<T>
|
|||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUsedBufferCount()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.query;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import org.apache.druid.collections.DefaultBlockingPool;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
|
||||
|
||||
public class MetricsEmittingMergingBlockingPool<T> extends DefaultBlockingPool<T>
|
||||
implements ExecutorServiceMonitor.MetricEmitter
|
||||
{
|
||||
|
||||
public MetricsEmittingMergingBlockingPool(
|
||||
Supplier<T> generator,
|
||||
int limit,
|
||||
ExecutorServiceMonitor executorServiceMonitor
|
||||
)
|
||||
{
|
||||
super(generator, limit);
|
||||
executorServiceMonitor.add(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emitMetrics(ServiceEmitter emitter, ServiceMetricEvent.Builder metricBuilder)
|
||||
{
|
||||
emitter.emit(metricBuilder.setMetric("query/merge/buffersUsed", getUsedBufferCount()));
|
||||
emitter.emit(metricBuilder.setMetric("query/merge/totalBuffers", maxSize()));
|
||||
}
|
||||
}
|
|
@ -292,4 +292,14 @@ public class BlockingPoolTest
|
|||
r2.forEach(ReferenceCountingResourceHolder::close);
|
||||
Assert.assertEquals(pool.maxSize(), pool.getPoolSize());
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000L)
|
||||
public void testGetUsedBufferCount()
|
||||
{
|
||||
final List<ReferenceCountingResourceHolder<Integer>> holder = pool.takeBatch(6, 100L);
|
||||
Assert.assertNotNull(holder);
|
||||
Assert.assertEquals(6, pool.getUsedBufferCount());
|
||||
holder.forEach(ReferenceCountingResourceHolder::close);
|
||||
Assert.assertEquals(0, pool.getUsedBufferCount());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -147,4 +147,10 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
|
|||
{
|
||||
return takenFromMap.values();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getUsedBufferCount()
|
||||
{
|
||||
return takenFromMap.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.druid.client.cache.CachePopulator;
|
|||
import org.apache.druid.client.cache.CachePopulatorStats;
|
||||
import org.apache.druid.client.cache.ForegroundCachePopulator;
|
||||
import org.apache.druid.collections.BlockingPool;
|
||||
import org.apache.druid.collections.DefaultBlockingPool;
|
||||
import org.apache.druid.collections.NonBlockingPool;
|
||||
import org.apache.druid.collections.StupidPool;
|
||||
import org.apache.druid.guice.annotations.Global;
|
||||
|
@ -43,6 +42,7 @@ import org.apache.druid.java.util.common.logger.Logger;
|
|||
import org.apache.druid.offheap.OffheapBufferGenerator;
|
||||
import org.apache.druid.query.DruidProcessingConfig;
|
||||
import org.apache.druid.query.ExecutorServiceMonitor;
|
||||
import org.apache.druid.query.MetricsEmittingMergingBlockingPool;
|
||||
import org.apache.druid.query.MetricsEmittingQueryProcessingPool;
|
||||
import org.apache.druid.query.PrioritizedExecutorService;
|
||||
import org.apache.druid.query.QueryProcessingPool;
|
||||
|
@ -172,12 +172,16 @@ public class DruidProcessingModule implements Module
|
|||
);
|
||||
}
|
||||
|
||||
public static BlockingPool<ByteBuffer> createMergeBufferPool(final DruidProcessingConfig config)
|
||||
public static BlockingPool<ByteBuffer> createMergeBufferPool(
|
||||
final DruidProcessingConfig config,
|
||||
ExecutorServiceMonitor executorServiceMonitor
|
||||
)
|
||||
{
|
||||
verifyDirectMemory(config);
|
||||
return new DefaultBlockingPool<>(
|
||||
return new MetricsEmittingMergingBlockingPool<>(
|
||||
new OffheapBufferGenerator("result merging", config.intermediateComputeSizeBytes()),
|
||||
config.getNumMergeBuffers()
|
||||
config.getNumMergeBuffers(),
|
||||
executorServiceMonitor
|
||||
);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue