Added backpressure metric (#6335)

* Added backpressure metric

* Updated channelReadable to AtomicBoolean and fixed broken test

* Moved backpressure metric logic to NettyHttpClient

* Fix placement of calculating backPressureDuration
This commit is contained in:
Shiv Toolsidass 2018-09-29 14:24:04 -07:00 committed by Gian Merlino
parent f09e718c68
commit 5a894f830b
8 changed files with 45 additions and 3 deletions

View File

@ -72,6 +72,7 @@ public class NettyHttpClient extends AbstractHttpClient
private final ResourcePool<String, ChannelFuture> pool;
private final HttpClientConfig.CompressionCodec compressionCodec;
private final Duration defaultReadTimeout;
private long backPressureStartTimeNs;
NettyHttpClient(
ResourcePool<String, ChannelFuture> pool,
@ -212,9 +213,13 @@ public class NettyHttpClient extends AbstractHttpClient
if (suspendWatermark >= 0 && resumeWatermark >= suspendWatermark) {
suspendWatermark = -1;
channel.setReadable(true);
long backPressureDuration = System.nanoTime() - backPressureStartTimeNs;
log.debug("[%s] Resumed reads from channel (chunkNum = %,d).", requestDesc, resumeChunkNum);
return backPressureDuration;
}
}
return 0; //If we didn't resume, don't know if backpressure was happening
};
response = handler.handleResponse(httpResponse, trafficCop);
if (response.isFinished()) {
@ -271,6 +276,7 @@ public class NettyHttpClient extends AbstractHttpClient
suspendWatermark = Math.max(suspendWatermark, currentChunkNum);
if (suspendWatermark > resumeWatermark) {
channel.setReadable(false);
backPressureStartTimeNs = System.nanoTime();
log.debug("[%s] Suspended reads from channel (chunkNum = %,d).", requestDesc, currentChunkNum);
}
}

View File

@ -91,7 +91,8 @@ public interface HttpResponseHandler<IntermediateType, FinalType>
* Call this to resume reading after you have suspended it.
*
* @param chunkNum chunk number corresponding to the handleChunk() or handleResponse() call from which you
* @return time that backpressure was applied (channel was closed for reads)
*/
void resume(long chunkNum);
long resume(long chunkNum);
}
}

View File

@ -243,6 +243,12 @@ public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMet
return reportMillisTimeMetric("query/node/ttfb", timeNs);
}
@Override
public QueryMetrics<QueryType> reportBackPressureTime(long timeNs)
{
return reportMillisTimeMetric("query/node/backpressure", timeNs);
}
@Override
public QueryMetrics<QueryType> reportNodeTime(long timeNs)
{

View File

@ -278,6 +278,11 @@ public interface QueryMetrics<QueryType extends Query<?>>
*/
QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs);
/**
* Registers "time that channel is unreadable (backpressure)" metric.
*/
QueryMetrics<QueryType> reportBackPressureTime(long timeNs);
/**
* Registers "node time" metric.
*/

View File

@ -208,6 +208,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics
return delegateQueryMetrics.reportNodeTimeToFirstByte(timeNs);
}
@Override
public QueryMetrics reportBackPressureTime(long timeNs)
{
return delegateQueryMetrics.reportBackPressureTime(timeNs);
}
@Override
public QueryMetrics reportNodeTime(long timeNs)
{

View File

@ -207,6 +207,12 @@ public class DefaultSelectQueryMetrics implements SelectQueryMetrics
return delegateQueryMetrics.reportNodeTimeToFirstByte(timeNs);
}
@Override
public QueryMetrics reportBackPressureTime(long timeNs)
{
return delegateQueryMetrics.reportBackPressureTime(timeNs);
}
@Override
public QueryMetrics reportNodeTime(long timeNs)
{

View File

@ -152,5 +152,10 @@ public class DefaultQueryMetricsTest
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/bytes", actualEvent.get("metric"));
Assert.assertEquals(10L, actualEvent.get("value"));
queryMetrics.reportBackPressureTime(11000001).emit(serviceEmitter);
actualEvent = cachingEmitter.getLastEmittedEvent().toMap();
Assert.assertEquals("query/node/backpressure", actualEvent.get("metric"));
Assert.assertEquals(11L, actualEvent.get("value"));
}
}

View File

@ -203,6 +203,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
{
private final AtomicLong totalByteCount = new AtomicLong(0);
private final AtomicLong queuedByteCount = new AtomicLong(0);
private final AtomicLong channelSuspendedTime = new AtomicLong(0);
private final BlockingQueue<InputStreamHolder> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
@ -244,8 +245,9 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength());
if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) {
Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?")
long backPressureTime = Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?")
.resume(holder.getChunkNum());
channelSuspendedTime.addAndGet(backPressureTime);
}
return holder.getStream();
@ -382,6 +384,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(totalByteCount.get());
if (usingBackpressure) {
responseMetrics.reportBackPressureTime(channelSuspendedTime.get());
}
responseMetrics.emit(emitter);
synchronized (done) {
try {