From 5a894f830b6d0dd7630bdab87a9fe0fd0237c704 Mon Sep 17 00:00:00 2001 From: Shiv Toolsidass Date: Sat, 29 Sep 2018 14:24:04 -0700 Subject: [PATCH] Added backpressure metric (#6335) * Added backpressure metric * Updated channelReadable to AtomicBoolean and fixed broken test * Moved backpressure metric logic to NettyHttpClient * Fix placement of calculating backPressureDuration --- .../druid/java/util/http/client/NettyHttpClient.java | 6 ++++++ .../http/client/response/HttpResponseHandler.java | 3 ++- .../org/apache/druid/query/DefaultQueryMetrics.java | 6 ++++++ .../java/org/apache/druid/query/QueryMetrics.java | 5 +++++ .../druid/query/search/DefaultSearchQueryMetrics.java | 6 ++++++ .../druid/query/select/DefaultSelectQueryMetrics.java | 6 ++++++ .../apache/druid/query/DefaultQueryMetricsTest.java | 5 +++++ .../org/apache/druid/client/DirectDruidClient.java | 11 +++++++++-- 8 files changed, 45 insertions(+), 3 deletions(-) diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java index d15e1e6aa76..4603a33ba31 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java @@ -72,6 +72,7 @@ public class NettyHttpClient extends AbstractHttpClient private final ResourcePool pool; private final HttpClientConfig.CompressionCodec compressionCodec; private final Duration defaultReadTimeout; + private long backPressureStartTimeNs; NettyHttpClient( ResourcePool pool, @@ -212,9 +213,13 @@ public class NettyHttpClient extends AbstractHttpClient if (suspendWatermark >= 0 && resumeWatermark >= suspendWatermark) { suspendWatermark = -1; channel.setReadable(true); + long backPressureDuration = System.nanoTime() - backPressureStartTimeNs; log.debug("[%s] Resumed reads from channel (chunkNum = %,d).", requestDesc, resumeChunkNum); + return backPressureDuration; } } + + return 0; //If we didn't resume, don't know if backpressure was happening }; response = handler.handleResponse(httpResponse, trafficCop); if (response.isFinished()) { @@ -271,6 +276,7 @@ public class NettyHttpClient extends AbstractHttpClient suspendWatermark = Math.max(suspendWatermark, currentChunkNum); if (suspendWatermark > resumeWatermark) { channel.setReadable(false); + backPressureStartTimeNs = System.nanoTime(); log.debug("[%s] Suspended reads from channel (chunkNum = %,d).", requestDesc, currentChunkNum); } } diff --git a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java index 02d6caa6f2f..03e54e702e0 100644 --- a/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java +++ b/java-util/src/main/java/org/apache/druid/java/util/http/client/response/HttpResponseHandler.java @@ -91,7 +91,8 @@ public interface HttpResponseHandler * Call this to resume reading after you have suspended it. * * @param chunkNum chunk number corresponding to the handleChunk() or handleResponse() call from which you + * @return time that backpressure was applied (channel was closed for reads) */ - void resume(long chunkNum); + long resume(long chunkNum); } } diff --git a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java index 6f8d3c6c2b3..b332fec98df 100644 --- a/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DefaultQueryMetrics.java @@ -243,6 +243,12 @@ public class DefaultQueryMetrics> implements QueryMet return reportMillisTimeMetric("query/node/ttfb", timeNs); } + @Override + public QueryMetrics reportBackPressureTime(long timeNs) + { + return reportMillisTimeMetric("query/node/backpressure", timeNs); + } + @Override public QueryMetrics reportNodeTime(long timeNs) { diff --git a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java index b8d2b558b0f..c5e32b968a3 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/QueryMetrics.java @@ -278,6 +278,11 @@ public interface QueryMetrics> */ QueryMetrics reportNodeTimeToFirstByte(long timeNs); + /** + * Registers "time that channel is unreadable (backpressure)" metric. + */ + QueryMetrics reportBackPressureTime(long timeNs); + /** * Registers "node time" metric. */ diff --git a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java index 96b126a9be0..a44b004a91f 100644 --- a/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/search/DefaultSearchQueryMetrics.java @@ -208,6 +208,12 @@ public class DefaultSearchQueryMetrics implements SearchQueryMetrics return delegateQueryMetrics.reportNodeTimeToFirstByte(timeNs); } + @Override + public QueryMetrics reportBackPressureTime(long timeNs) + { + return delegateQueryMetrics.reportBackPressureTime(timeNs); + } + @Override public QueryMetrics reportNodeTime(long timeNs) { diff --git a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java index bf594127d00..d2b6b604498 100644 --- a/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/select/DefaultSelectQueryMetrics.java @@ -207,6 +207,12 @@ public class DefaultSelectQueryMetrics implements SelectQueryMetrics return delegateQueryMetrics.reportNodeTimeToFirstByte(timeNs); } + @Override + public QueryMetrics reportBackPressureTime(long timeNs) + { + return delegateQueryMetrics.reportBackPressureTime(timeNs); + } + @Override public QueryMetrics reportNodeTime(long timeNs) { diff --git a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java index ac022cacfff..16312c6ea57 100644 --- a/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java +++ b/processing/src/test/java/org/apache/druid/query/DefaultQueryMetricsTest.java @@ -152,5 +152,10 @@ public class DefaultQueryMetricsTest actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); Assert.assertEquals("query/node/bytes", actualEvent.get("metric")); Assert.assertEquals(10L, actualEvent.get("value")); + + queryMetrics.reportBackPressureTime(11000001).emit(serviceEmitter); + actualEvent = cachingEmitter.getLastEmittedEvent().toMap(); + Assert.assertEquals("query/node/backpressure", actualEvent.get("metric")); + Assert.assertEquals(11L, actualEvent.get("value")); } } diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java index 60dfd7a0c45..39396b8bd7d 100644 --- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java +++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java @@ -203,6 +203,7 @@ public class DirectDruidClient implements QueryRunner { private final AtomicLong totalByteCount = new AtomicLong(0); private final AtomicLong queuedByteCount = new AtomicLong(0); + private final AtomicLong channelSuspendedTime = new AtomicLong(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); @@ -244,8 +245,9 @@ public class DirectDruidClient implements QueryRunner final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength()); if (usingBackpressure && currentQueuedByteCount < maxQueuedBytes) { - Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?") - .resume(holder.getChunkNum()); + long backPressureTime = Preconditions.checkNotNull(trafficCopRef.get(), "No TrafficCop, how can this be?") + .resume(holder.getChunkNum()); + channelSuspendedTime.addAndGet(backPressureTime); } return holder.getStream(); @@ -382,6 +384,11 @@ public class DirectDruidClient implements QueryRunner QueryMetrics> responseMetrics = acquireResponseMetrics(); responseMetrics.reportNodeTime(nodeTimeNs); responseMetrics.reportNodeBytes(totalByteCount.get()); + + if (usingBackpressure) { + responseMetrics.reportBackPressureTime(channelSuspendedTime.get()); + } + responseMetrics.emit(emitter); synchronized (done) { try {