mirror of https://github.com/apache/druid.git
Merge pull request #932 from metamx/directDruidClientSpeedupRebase
Replace AppendableByteArrayInputStream in DirectDruidClient
This commit is contained in:
commit
fc7f1e6975
|
@ -1,6 +1,6 @@
|
||||||
/*
|
/*
|
||||||
* Druid - a distributed column store.
|
* Druid - a distributed column store.
|
||||||
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
|
* Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc.
|
||||||
*
|
*
|
||||||
* This program is free software; you can redistribute it and/or
|
* This program is free software; you can redistribute it and/or
|
||||||
* modify it under the terms of the GNU General Public License
|
* modify it under the terms of the GNU General Public License
|
||||||
|
@ -31,6 +31,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
import com.google.common.base.Throwables;
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.common.io.ByteSource;
|
||||||
import com.google.common.util.concurrent.FutureCallback;
|
import com.google.common.util.concurrent.FutureCallback;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
@ -43,9 +44,8 @@ import com.metamx.common.guava.Sequence;
|
||||||
import com.metamx.common.guava.Sequences;
|
import com.metamx.common.guava.Sequences;
|
||||||
import com.metamx.common.logger.Logger;
|
import com.metamx.common.logger.Logger;
|
||||||
import com.metamx.http.client.HttpClient;
|
import com.metamx.http.client.HttpClient;
|
||||||
import com.metamx.http.client.io.AppendableByteArrayInputStream;
|
|
||||||
import com.metamx.http.client.response.ClientResponse;
|
import com.metamx.http.client.response.ClientResponse;
|
||||||
import com.metamx.http.client.response.InputStreamResponseHandler;
|
import com.metamx.http.client.response.HttpResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHandler;
|
import com.metamx.http.client.response.StatusResponseHandler;
|
||||||
import com.metamx.http.client.response.StatusResponseHolder;
|
import com.metamx.http.client.response.StatusResponseHolder;
|
||||||
import io.druid.query.BySegmentResultValueClass;
|
import io.druid.query.BySegmentResultValueClass;
|
||||||
|
@ -57,6 +57,8 @@ import io.druid.query.QueryToolChestWarehouse;
|
||||||
import io.druid.query.QueryWatcher;
|
import io.druid.query.QueryWatcher;
|
||||||
import io.druid.query.Result;
|
import io.druid.query.Result;
|
||||||
import io.druid.query.aggregation.MetricManipulatorFns;
|
import io.druid.query.aggregation.MetricManipulatorFns;
|
||||||
|
import org.jboss.netty.buffer.ChannelBuffer;
|
||||||
|
import org.jboss.netty.buffer.ChannelBufferInputStream;
|
||||||
import org.jboss.netty.handler.codec.http.HttpChunk;
|
import org.jboss.netty.handler.codec.http.HttpChunk;
|
||||||
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
import org.jboss.netty.handler.codec.http.HttpHeaders;
|
||||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||||
|
@ -65,13 +67,19 @@ import javax.ws.rs.core.MediaType;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.io.SequenceInputStream;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
|
import java.util.Enumeration;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.CancellationException;
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
|
@ -143,23 +151,19 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
|
|
||||||
try {
|
try {
|
||||||
log.debug("Querying url[%s]", url);
|
log.debug("Querying url[%s]", url);
|
||||||
future = httpClient
|
|
||||||
.post(new URL(url))
|
final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
|
||||||
.setContent(objectMapper.writeValueAsBytes(query))
|
|
||||||
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON)
|
|
||||||
.go(
|
|
||||||
new InputStreamResponseHandler()
|
|
||||||
{
|
{
|
||||||
long startTime;
|
private long startTime;
|
||||||
long byteCount = 0;
|
private final AtomicLong byteCount = new AtomicLong(0);
|
||||||
|
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
|
||||||
|
private final AtomicBoolean done = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientResponse<AppendableByteArrayInputStream> handleResponse(HttpResponse response)
|
public ClientResponse<InputStream> handleResponse(HttpResponse response)
|
||||||
{
|
{
|
||||||
log.debug("Initial response from url[%s]", url);
|
log.debug("Initial response from url[%s]", url);
|
||||||
startTime = System.currentTimeMillis();
|
startTime = System.currentTimeMillis();
|
||||||
byteCount += response.getContent().readableBytes();
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final String responseContext = response.headers().get("X-Druid-Response-Context");
|
final String responseContext = response.headers().get("X-Druid-Response-Context");
|
||||||
// context may be null in case of error or query timeout
|
// context may be null in case of error or query timeout
|
||||||
|
@ -172,39 +176,143 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
queue.put(new ChannelBufferInputStream(response.getContent()));
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (final IOException e) {
|
||||||
log.error(e, "Unable to parse response context from url[%s]", url);
|
log.error(e, "Error parsing response context from url [%s]", url);
|
||||||
|
return ClientResponse.<InputStream>finished(
|
||||||
|
new InputStream()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException
|
||||||
|
{
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.error(e, "Queue appending interrupted");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
byteCount.addAndGet(response.getContent().readableBytes());
|
||||||
|
return ClientResponse.<InputStream>finished(
|
||||||
|
new SequenceInputStream(
|
||||||
|
new Enumeration<InputStream>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public boolean hasMoreElements()
|
||||||
|
{
|
||||||
|
// Done is always true until the last stream has be put in the queue.
|
||||||
|
// Then the stream should be spouting good InputStreams.
|
||||||
|
synchronized (done) {
|
||||||
|
return !done.get() || !queue.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
return super.handleResponse(response);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientResponse<AppendableByteArrayInputStream> handleChunk(
|
public InputStream nextElement()
|
||||||
ClientResponse<AppendableByteArrayInputStream> clientResponse, HttpChunk chunk
|
{
|
||||||
|
synchronized (done) {
|
||||||
|
try {
|
||||||
|
// Ensures more elements are expected via `done`
|
||||||
|
return queue.take();
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientResponse<InputStream> handleChunk(
|
||||||
|
ClientResponse<InputStream> clientResponse, HttpChunk chunk
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
final int bytes = chunk.getContent().readableBytes();
|
final ChannelBuffer channelBuffer = chunk.getContent();
|
||||||
byteCount += bytes;
|
final int bytes = channelBuffer.readableBytes();
|
||||||
return super.handleChunk(clientResponse, chunk);
|
if (bytes > 0) {
|
||||||
|
try {
|
||||||
|
queue.put(new ChannelBufferInputStream(channelBuffer));
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
byteCount.addAndGet(bytes);
|
||||||
|
}
|
||||||
|
return clientResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ClientResponse<InputStream> done(ClientResponse<AppendableByteArrayInputStream> clientResponse)
|
public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse)
|
||||||
{
|
{
|
||||||
long stopTime = System.currentTimeMillis();
|
long stopTime = System.currentTimeMillis();
|
||||||
log.debug(
|
log.debug(
|
||||||
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
|
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
|
||||||
url,
|
url,
|
||||||
byteCount,
|
byteCount.get(),
|
||||||
stopTime - startTime,
|
stopTime - startTime,
|
||||||
byteCount / (0.0001 * (stopTime - startTime))
|
byteCount.get() / (0.0001 * (stopTime - startTime))
|
||||||
);
|
);
|
||||||
return super.done(clientResponse);
|
synchronized (done) {
|
||||||
|
try {
|
||||||
|
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
|
||||||
|
// after done is set to true, regardless of the rest of the stream's state.
|
||||||
|
queue.put(ByteSource.empty().openStream());
|
||||||
|
}
|
||||||
|
catch (InterruptedException e) {
|
||||||
|
log.error(e, "Unable to put finalizing input stream into Sequence queue for url [%s]", url);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
catch (IOException e) {
|
||||||
|
// This should never happen
|
||||||
|
throw Throwables.propagate(e);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
done.set(true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ClientResponse.<InputStream>finished(clientResponse.getObj());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(final ClientResponse<InputStream> clientResponse, final Throwable e)
|
||||||
|
{
|
||||||
|
// Don't wait for lock in case the lock had something to do with the error
|
||||||
|
synchronized (done) {
|
||||||
|
done.set(true);
|
||||||
|
// Make a best effort to put a zero length buffer into the queue in case something is waiting on the take()
|
||||||
|
// If nothing is waiting on take(), this will be closed out anyways.
|
||||||
|
queue.offer(
|
||||||
|
new InputStream()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException
|
||||||
|
{
|
||||||
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
future = httpClient
|
||||||
|
.post(new URL(url))
|
||||||
|
.setContent(objectMapper.writeValueAsBytes(query))
|
||||||
|
.setHeader(
|
||||||
|
HttpHeaders.Names.CONTENT_TYPE,
|
||||||
|
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
|
||||||
|
)
|
||||||
|
.go(responseHandler);
|
||||||
|
|
||||||
queryWatcher.registerQuery(query, future);
|
queryWatcher.registerQuery(query, future);
|
||||||
|
|
||||||
|
@ -228,7 +336,10 @@ public class DirectDruidClient<T> implements QueryRunner<T>
|
||||||
StatusResponseHolder res = httpClient
|
StatusResponseHolder res = httpClient
|
||||||
.delete(new URL(cancelUrl))
|
.delete(new URL(cancelUrl))
|
||||||
.setContent(objectMapper.writeValueAsBytes(query))
|
.setContent(objectMapper.writeValueAsBytes(query))
|
||||||
.setHeader(HttpHeaders.Names.CONTENT_TYPE, isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON)
|
.setHeader(
|
||||||
|
HttpHeaders.Names.CONTENT_TYPE,
|
||||||
|
isSmile ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : MediaType.APPLICATION_JSON
|
||||||
|
)
|
||||||
.go(new StatusResponseHandler(Charsets.UTF_8))
|
.go(new StatusResponseHandler(Charsets.UTF_8))
|
||||||
.get();
|
.get();
|
||||||
if (res.getStatus().getCode() >= 500) {
|
if (res.getStatus().getCode() >= 500) {
|
||||||
|
|
Loading…
Reference in New Issue