diff --git a/pom.xml b/pom.xml index 3d082a9cf93..7ca8aa0536f 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,7 @@ com.metamx http-client - 0.8.4 + 0.8.5 com.metamx diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 8befef5cabb..6c376ed479e 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -29,6 +29,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Throwables; import com.google.common.collect.Maps; import com.google.common.io.Closeables; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.IAE; import com.metamx.common.Pair; import com.metamx.common.guava.BaseSequence; @@ -123,12 +126,11 @@ public class DirectDruidClient implements QueryRunner typeRef = types.lhs; } - final Future future; + final ListenableFuture future; final String url = String.format("http://%s/druid/v2/", host); try { log.debug("Querying url[%s]", url); - openConnections.getAndIncrement(); future = httpClient .post(new URL(url)) .setContent(objectMapper.writeValueAsBytes(query)) @@ -169,11 +171,27 @@ public class DirectDruidClient implements QueryRunner stopTime - startTime, byteCount / (0.0001 * (stopTime - startTime)) ); - openConnections.getAndDecrement(); return super.done(clientResponse); } } ); + openConnections.getAndIncrement(); + Futures.addCallback( + future, new FutureCallback() + { + @Override + public void onSuccess(InputStream result) + { + openConnections.getAndDecrement(); + } + + @Override + public void onFailure(Throwable t) + { + openConnections.getAndDecrement(); + } + } + ); } catch (IOException e) { throw Throwables.propagate(e);