diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index cc1bbd8dba2..3ab0c3f137f 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -200,21 +200,29 @@ public class DirectDruidClient implements QueryRunner final long requestStartTimeNs = System.nanoTime(); - final QueryMetrics> queryMetrics = toolChest.makeMetrics(query); - queryMetrics.server(host); - long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue(); long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query); AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED); final HttpResponseHandler responseHandler = new HttpResponseHandler() { - private long responseStartTimeNs; private final AtomicLong byteCount = new AtomicLong(0); private final BlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean done = new AtomicBoolean(false); private final AtomicReference fail = new AtomicReference<>(); + private QueryMetrics> queryMetrics; + private long responseStartTimeNs; + + private QueryMetrics> acquireResponseMetrics() + { + if (queryMetrics == null) { + queryMetrics = toolChest.makeMetrics(query); + queryMetrics.server(host); + } + return queryMetrics; + } + @Override public ClientResponse handleResponse(HttpResponse response) { @@ -223,7 +231,7 @@ public class DirectDruidClient implements QueryRunner log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId()); responseStartTimeNs = System.nanoTime(); - queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); + acquireResponseMetrics().reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter); try { final String responseContext = response.headers().get("X-Druid-Response-Context"); @@ -342,9 +350,10 @@ public class DirectDruidClient implements QueryRunner nodeTimeMs, byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception ); - queryMetrics.reportNodeTime(nodeTimeNs); - queryMetrics.reportNodeBytes(byteCount.get()); - queryMetrics.emit(emitter); + QueryMetrics> responseMetrics = acquireResponseMetrics(); + responseMetrics.reportNodeTime(nodeTimeNs); + responseMetrics.reportNodeBytes(byteCount.get()); + responseMetrics.emit(emitter); synchronized (done) { try { // An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out