Don't use QueryMetrics from multiple threads in DirectDruidClient (fixes #4308) (#4309)

* Don't use QueryMetrics from multiple threads in DirectDruidClient

* reponseMetrics
This commit is contained in:
Roman Leventov 2017-05-23 12:07:27 -05:00 committed by Fangjin Yang
parent 2bd4c0930f
commit f97c49ba0e
1 changed files with 17 additions and 8 deletions

View File

@ -200,21 +200,29 @@ public class DirectDruidClient<T> implements QueryRunner<T>
final long requestStartTimeNs = System.nanoTime();
final QueryMetrics<? super Query<T>> queryMetrics = toolChest.makeMetrics(query);
queryMetrics.server(host);
long timeoutAt = ((Long) context.get(QUERY_FAIL_TIME)).longValue();
long maxScatterGatherBytes = QueryContexts.getMaxScatterGatherBytes(query);
AtomicLong totalBytesGathered = (AtomicLong) context.get(QUERY_TOTAL_BYTES_GATHERED);
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private long responseStartTimeNs;
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<String> fail = new AtomicReference<>();
private QueryMetrics<? super Query<T>> queryMetrics;
private long responseStartTimeNs;
private QueryMetrics<? super Query<T>> acquireResponseMetrics()
{
if (queryMetrics == null) {
queryMetrics = toolChest.makeMetrics(query);
queryMetrics.server(host);
}
return queryMetrics;
}
@Override
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
@ -223,7 +231,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
log.debug("Initial response from url[%s] for queryId[%s]", url, query.getId());
responseStartTimeNs = System.nanoTime();
queryMetrics.reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter);
acquireResponseMetrics().reportNodeTimeToFirstByte(responseStartTimeNs - requestStartTimeNs).emit(emitter);
try {
final String responseContext = response.headers().get("X-Druid-Response-Context");
@ -342,9 +350,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
nodeTimeMs,
byteCount.get() / (0.001 * nodeTimeMs) // Floating math; division by zero will yield Inf, not exception
);
queryMetrics.reportNodeTime(nodeTimeNs);
queryMetrics.reportNodeBytes(byteCount.get());
queryMetrics.emit(emitter);
QueryMetrics<? super Query<T>> responseMetrics = acquireResponseMetrics();
responseMetrics.reportNodeTime(nodeTimeNs);
responseMetrics.reportNodeBytes(byteCount.get());
responseMetrics.emit(emitter);
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out