fix connection counting

This commit is contained in:
Xavier Léauté 2014-01-09 11:55:43 -08:00
parent d28f9daccb
commit 3d734944ef
2 changed files with 22 additions and 4 deletions

View File

@ -78,7 +78,7 @@
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>http-client</artifactId> <artifactId>http-client</artifactId>
<version>0.8.4</version> <version>0.8.5</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>

View File

@ -29,6 +29,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.BaseSequence;
@ -123,12 +126,11 @@ public class DirectDruidClient<T> implements QueryRunner<T>
typeRef = types.lhs; typeRef = types.lhs;
} }
final Future<InputStream> future; final ListenableFuture<InputStream> future;
final String url = String.format("http://%s/druid/v2/", host); final String url = String.format("http://%s/druid/v2/", host);
try { try {
log.debug("Querying url[%s]", url); log.debug("Querying url[%s]", url);
openConnections.getAndIncrement();
future = httpClient future = httpClient
.post(new URL(url)) .post(new URL(url))
.setContent(objectMapper.writeValueAsBytes(query)) .setContent(objectMapper.writeValueAsBytes(query))
@ -169,11 +171,27 @@ public class DirectDruidClient<T> implements QueryRunner<T>
stopTime - startTime, stopTime - startTime,
byteCount / (0.0001 * (stopTime - startTime)) byteCount / (0.0001 * (stopTime - startTime))
); );
openConnections.getAndDecrement();
return super.done(clientResponse); return super.done(clientResponse);
} }
} }
); );
openConnections.getAndIncrement();
Futures.addCallback(
future, new FutureCallback<InputStream>()
{
@Override
public void onSuccess(InputStream result)
{
openConnections.getAndDecrement();
}
@Override
public void onFailure(Throwable t)
{
openConnections.getAndDecrement();
}
}
);
} }
catch (IOException e) { catch (IOException e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);