Add metric -- count of queries waiting for merge buffers (#15025)

Add 'mergeBuffer/pendingRequests' metric that exposes the count of waiting queries (threads) blocking in the merge buffers pools.
This commit is contained in:
kaisun2000 2023-10-09 00:26:23 -07:00 committed by GitHub
parent c483cb863d
commit e2cc1c4ad1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 104 additions and 5 deletions

View File

@ -62,6 +62,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`query/segments/count`|This metric is not enabled by default. See the `QueryMetrics` Interface for reference regarding enabling this metric. Number of segments that will be touched by the query. In the broker, it makes a plan to distribute the query to realtime tasks and historicals based on a snapshot of segment distribution state. If there are some segments moved after this snapshot is created, certain historicals and realtime tasks can report those segments as missing to the broker. The broker will resend the query to the new servers that serve those segments after move. In this case, those segments can be counted more than once in this metric.||Varies|
|`query/priority`|Assigned lane and priority, only if Laning strategy is enabled. Refer to [Laning strategies](../configuration/index.md#laning-strategies)|`lane`, `dataSource`, `type`|0|
|`sqlQuery/time`|Milliseconds taken to complete a SQL query.|`id`, `nativeQueryIds`, `dataSource`, `remoteAddress`, `success`, `engine`|< 1s|
@ -97,6 +98,7 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/failed/count`|Number of failed queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
### Real-time

View File

@ -31,7 +31,6 @@ public interface BlockingPool<T>
*
* @param elementNum number of resources to take
* @param timeoutMs maximum time to wait for resources, in milliseconds.
*
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum, long timeoutMs);
@ -40,8 +39,14 @@ public interface BlockingPool<T>
* Take resources from the pool, waiting if necessary until the elements of the given number become available.
*
* @param elementNum number of resources to take
*
* @return a list of resource holders. An empty list is returned if {@code elementNum} resources aren't available.
*/
List<ReferenceCountingResourceHolder<T>> takeBatch(int elementNum);
/**
* Returns the count of the requests waiting to acquire a batch of resources.
*
* @return count of pending requests
*/
long getPendingRequests();
}

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -48,6 +49,8 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
private final Condition notEnough;
private final int maxSize;
private final AtomicLong pendingRequests;
public DefaultBlockingPool(
Supplier<T> generator,
int limit
@ -62,6 +65,7 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
this.lock = new ReentrantLock();
this.notEnough = lock.newCondition();
this.pendingRequests = new AtomicLong();
}
@Override
@ -91,12 +95,16 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
checkInitialized();
try {
pendingRequests.incrementAndGet();
final List<T> objects = timeoutMs > 0 ? pollObjects(elementNum, timeoutMs) : pollObjects(elementNum);
return objects.stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
pendingRequests.decrementAndGet();
}
}
@Override
@ -104,11 +112,21 @@ public class DefaultBlockingPool<T> implements BlockingPool<T>
{
checkInitialized();
try {
pendingRequests.incrementAndGet();
return takeObjects(elementNum).stream().map(this::wrapObject).collect(Collectors.toList());
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
finally {
pendingRequests.incrementAndGet();
}
}
@Override
public long getPendingRequests()
{
return pendingRequests.get();
}
private List<T> pollObjects(int elementNum) throws InterruptedException

View File

@ -55,4 +55,10 @@ public final class DummyBlockingPool<T> implements BlockingPool<T>
{
throw new UnsupportedOperationException();
}
@Override
public long getPendingRequests()
{
return 0;
}
}

View File

@ -132,6 +132,12 @@ public class TestBufferPool implements NonBlockingPool<ByteBuffer>, BlockingPool
}
}
@Override
public long getPendingRequests()
{
return 0;
}
public long getOutstandingObjectCount()
{
return takenFromMap.size();

View File

@ -21,24 +21,30 @@ package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
import org.apache.druid.java.util.metrics.KeyedDiff;
import java.nio.ByteBuffer;
import java.util.Map;
public class QueryCountStatsMonitor extends AbstractMonitor
{
private final KeyedDiff keyedDiff = new KeyedDiff();
private final QueryCountStatsProvider statsProvider;
private final BlockingPool<ByteBuffer> mergeBufferPool;
@Inject
public QueryCountStatsMonitor(
QueryCountStatsProvider statsProvider
QueryCountStatsProvider statsProvider,
@Merging BlockingPool<ByteBuffer> mergeBufferPool
)
{
this.statsProvider = statsProvider;
this.mergeBufferPool = mergeBufferPool;
}
@Override
@ -65,6 +71,9 @@ public class QueryCountStatsMonitor extends AbstractMonitor
emitter.emit(builder.setMetric(diffEntry.getKey(), diffEntry.getValue()));
}
}
long pendingQueries = this.mergeBufferPool.getPendingRequests();
emitter.emit(builder.setMetric("mergeBuffer/pendingRequests", pendingQueries));
return true;
}

View File

@ -19,17 +19,27 @@
package org.apache.druid.server.metrics;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
public class QueryCountStatsMonitorTest
{
private QueryCountStatsProvider queryCountStatsProvider;
private BlockingPool<ByteBuffer> mergeBufferPool;
private ExecutorService executorService;
@Before
public void setUp()
@ -69,14 +79,24 @@ public class QueryCountStatsMonitorTest
return timedOutEmitCount;
}
};
mergeBufferPool = new DefaultBlockingPool(() -> ByteBuffer.allocate(1024), 1);
executorService = Executors.newSingleThreadExecutor();
}
@After
public void tearDown()
{
executorService.shutdown();
}
@Test
public void testMonitor()
{
final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider);
final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("service", "host");
monitor.doMonitor(emitter);
emitter.flush();
// Trigger metric emission
monitor.doMonitor(emitter);
Map<String, Long> resultMap = emitter.getEvents()
@ -85,12 +105,45 @@ public class QueryCountStatsMonitorTest
event -> (String) event.toMap().get("metric"),
event -> (Long) event.toMap().get("value")
));
Assert.assertEquals(5, resultMap.size());
Assert.assertEquals(6, resultMap.size());
Assert.assertEquals(1L, (long) resultMap.get("query/success/count"));
Assert.assertEquals(2L, (long) resultMap.get("query/failed/count"));
Assert.assertEquals(3L, (long) resultMap.get("query/interrupted/count"));
Assert.assertEquals(4L, (long) resultMap.get("query/timeout/count"));
Assert.assertEquals(10L, (long) resultMap.get("query/count"));
Assert.assertEquals(0, (long) resultMap.get("mergeBuffer/pendingRequests"));
}
@Test(timeout = 2_000L)
public void testMonitoringMergeBuffer()
{
executorService.submit(() -> {
mergeBufferPool.takeBatch(10);
});
int count = 0;
try {
// wait at most 10 secs for the executor thread to block
while (mergeBufferPool.getPendingRequests() == 0) {
Thread.sleep(100);
count++;
if (count >= 20) {
break;
}
}
final QueryCountStatsMonitor monitor = new QueryCountStatsMonitor(queryCountStatsProvider, mergeBufferPool);
final StubServiceEmitter emitter = new StubServiceEmitter("DummyService", "DummyHost");
boolean ret = monitor.doMonitor(emitter);
Assert.assertTrue(ret);
List<Number> numbers = emitter.getMetricValues("mergeBuffer/pendingRequests", Collections.emptyMap());
Assert.assertEquals(1, numbers.size());
Assert.assertEquals(1, numbers.get(0).intValue());
}
catch (InterruptedException e) {
// do nothing
}
}
}